Record the stream and consumer info timestamps

This records the server time when info for streams and
consumers are created so that tools such as the nats cli
can calculate time deltas for last ack, last delivered and
so forth in the context of the server clock.

This will help aleviate problems with client devices experiencing
clock jitter that can show up in user interfaces as negative
seconds since last ack etc

Signed-off-by: R.I.Pienaar <rip@devco.net>
This commit is contained in:
R.I.Pienaar
2023-05-05 16:40:28 +02:00
parent 69fb3db0f5
commit fb1d86d506
4 changed files with 75 additions and 46 deletions

View File

@@ -55,6 +55,8 @@ type ConsumerInfo struct {
NumPending uint64 `json:"num_pending"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
PushBound bool `json:"push_bound,omitempty"`
// TimeStamp indicates when the info was gathered
TimeStamp time.Time `json:"ts"`
}
type ConsumerConfig struct {
@@ -2394,6 +2396,7 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
NumRedelivered: len(o.rdc),
NumPending: o.checkNumPending(),
PushBound: o.isPushMode() && o.active,
TimeStamp: time.Now().UTC(),
}
// If we are replicated and we are not the leader we need to pull certain data from our store.

View File

@@ -1373,9 +1373,10 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
return
}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
TimeStamp: time.Now().UTC(),
}
resp.DidCreate = true
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
@@ -1461,12 +1462,13 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Domain: s.getOpts().JetStreamDomain,
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Domain: s.getOpts().JetStreamDomain,
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
TimeStamp: time.Now().UTC(),
}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
@@ -1686,12 +1688,13 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s
for _, mset := range msets[offset:] {
config := mset.config()
resp.Streams = append(resp.Streams, &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: config,
Domain: s.getOpts().JetStreamDomain,
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: config,
Domain: s.getOpts().JetStreamDomain,
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
TimeStamp: time.Now().UTC(),
})
if len(resp.Streams) >= JSApiListLimit {
break
@@ -1846,6 +1849,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Alternates: js.streamAlternates(ci, config.Name),
TimeStamp: time.Now().UTC(),
}
if clusterWideConsCount > 0 {
resp.StreamInfo.State.Consumers = clusterWideConsCount
@@ -3455,7 +3459,12 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC
s.Warnf("Restore failed for %s for stream '%s > %s' in %v",
friendlyBytes(int64(total)), streamName, acc.Name, end.Sub(start))
} else {
resp.StreamInfo = &StreamInfo{Created: mset.createdTime(), State: mset.state(), Config: mset.config()}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
TimeStamp: time.Now().UTC(),
}
s.Noticef("Completed restore of %s for stream '%s > %s' in %v",
friendlyBytes(int64(total)), streamName, acc.Name, end.Sub(start))
}
@@ -4222,10 +4231,11 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
// our config and defaults for state and no cluster info.
if isMember {
resp.ConsumerInfo = &ConsumerInfo{
Stream: ca.Stream,
Name: ca.Name,
Created: ca.Created,
Config: ca.Config,
Stream: ca.Stream,
Name: ca.Name,
Created: ca.Created,
Config: ca.Config,
TimeStamp: time.Now().UTC(),
}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}

View File

@@ -2932,12 +2932,13 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
s.sendAPIErrResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
} else {
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
TimeStamp: time.Now().UTC(),
}
resp.DidCreate = true
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
@@ -3339,12 +3340,13 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
// Send our response.
var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
TimeStamp: time.Now().UTC(),
}
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
@@ -3398,12 +3400,13 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
if !recovering {
var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
TimeStamp: time.Now().UTC(),
}
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
}
@@ -6264,7 +6267,12 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt
for _, sa := range streams {
if s.allPeersOffline(sa.Group) {
// Place offline onto our results by hand here.
si := &StreamInfo{Config: *sa.Config, Created: sa.Created, Cluster: js.offlineClusterInfo(sa.Group)}
si := &StreamInfo{
Config: *sa.Config,
Created: sa.Created,
Cluster: js.offlineClusterInfo(sa.Group),
TimeStamp: time.Now().UTC(),
}
resp.Streams = append(resp.Streams, si)
missingNames = append(missingNames, sa.Config.Name)
} else {
@@ -6410,7 +6418,12 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of
for _, ca := range consumers {
if s.allPeersOffline(ca.Group) {
// Place offline onto our results by hand here.
ci := &ConsumerInfo{Config: ca.Config, Created: ca.Created, Cluster: js.offlineClusterInfo(ca.Group)}
ci := &ConsumerInfo{
Config: ca.Config,
Created: ca.Created,
Cluster: js.offlineClusterInfo(ca.Group),
TimeStamp: time.Now().UTC(),
}
resp.Consumers = append(resp.Consumers, ci)
missingNames = append(missingNames, ca.Name)
} else {
@@ -7993,12 +8006,13 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) {
}
si := &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: config,
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: config,
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
TimeStamp: time.Now().UTC(),
}
// Check for out of band catchups.

View File

@@ -136,6 +136,8 @@ type StreamInfo struct {
Mirror *StreamSourceInfo `json:"mirror,omitempty"`
Sources []*StreamSourceInfo `json:"sources,omitempty"`
Alternates []StreamAlternate `json:"alternates,omitempty"`
// TimeStamp indicates when the info was gathered
TimeStamp time.Time `json:"ts"`
}
type StreamAlternate struct {