Use client version for stream and consumer extended info

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-01-22 12:58:31 -08:00
parent 6970c25ca0
commit d7cfb8f6e9
7 changed files with 50 additions and 30 deletions

2
go.mod
View File

@@ -6,7 +6,7 @@ require (
github.com/klauspost/compress v1.11.4
github.com/minio/highwayhash v1.0.0
github.com/nats-io/jwt/v2 v2.0.0-20210107222814-18c5cc45d263
github.com/nats-io/nats.go v1.10.1-0.20210115180731-7fb8bacca613
github.com/nats-io/nats.go v1.10.1-0.20210122204956-b8ea7fc17ea6
github.com/nats-io/nkeys v0.2.0
github.com/nats-io/nuid v1.0.1
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897

2
go.sum
View File

@@ -44,6 +44,8 @@ github.com/nats-io/nats.go v1.10.1-0.20210114001154-0a6b5f686ab3 h1:2uYi4zZJ6zni
github.com/nats-io/nats.go v1.10.1-0.20210114001154-0a6b5f686ab3/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI=
github.com/nats-io/nats.go v1.10.1-0.20210115180731-7fb8bacca613 h1:Zio4IMHHsFjtTeksjF4PySxFNcKwSH9urNIiIW7A/FQ=
github.com/nats-io/nats.go v1.10.1-0.20210115180731-7fb8bacca613/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI=
github.com/nats-io/nats.go v1.10.1-0.20210122204956-b8ea7fc17ea6 h1:cpS+9uyfHXvRG/Q+WcDd3KXRgPa9fo9tDbIeDHCxYAg=
github.com/nats-io/nats.go v1.10.1-0.20210122204956-b8ea7fc17ea6/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.2.0 h1:WXKF7diOaPU9cJdLD7nuzwasQy9vT1tBqzXZZf3AMJM=

View File

@@ -776,6 +776,8 @@ func TestJetStreamClusterStreamPublishWithActiveConsumers(t *testing.T) {
// For slight skew in creation time.
ci.Created = ci.Created.Round(time.Second)
ci2.Created = ci2.Created.Round(time.Second)
ci.Cluster = nil
ci2.Cluster = nil
if !reflect.DeepEqual(ci, ci2) {
t.Fatalf("Consumer info did not match: %+v vs %+v", ci, ci2)
@@ -1318,24 +1320,19 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) {
}
}
// TODO(dlc) - Change over to Go client version once it is updated.
resp, err := nc.Request(fmt.Sprintf(server.JSApiStreamInfoT, "TEST"), nil, time.Second)
leader := c.streamLeader("$G", "TEST").Name()
si, err := js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var si server.StreamInfo
if err = json.Unmarshal(resp.Data, &si); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.Cluster == nil {
t.Fatalf("Expected cluster info")
}
if si.Cluster.Name != c.name {
t.Fatalf("Expected cluster name of %q, got %q", c.name, si.Cluster.Name)
}
leader := c.streamLeader("$G", "TEST").Name()
if si.Cluster.Leader != leader {
t.Fatalf("Expected leader of %q, got %q", leader, si.Cluster.Leader)
}
@@ -1355,18 +1352,14 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) {
c.waitOnNewStreamLeader("$G", "TEST")
// Re-request.
resp, err = nc.Request(fmt.Sprintf(server.JSApiStreamInfoT, "TEST"), nil, time.Second)
leader = c.streamLeader("$G", "TEST").Name()
si, err = js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if err = json.Unmarshal(resp.Data, &si); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.Cluster == nil {
t.Fatalf("Expected cluster info")
}
leader = c.streamLeader("$G", "TEST").Name()
if si.Cluster.Leader != leader {
t.Fatalf("Expected leader of %q, got %q", leader, si.Cluster.Leader)
}
@@ -1394,18 +1387,14 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) {
c.waitOnStreamCurrent(oldLeader, "$G", "TEST")
// Re-request.
resp, err = nc.Request(fmt.Sprintf(server.JSApiStreamInfoT, "TEST"), nil, time.Second)
leader = c.streamLeader("$G", "TEST").Name()
si, err = js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if err = json.Unmarshal(resp.Data, &si); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.Cluster == nil {
t.Fatalf("Expected cluster info")
}
leader = c.streamLeader("$G", "TEST").Name()
if si.Cluster.Leader != leader {
t.Fatalf("Expected leader of %q, got %q", leader, si.Cluster.Leader)
}
@@ -1426,16 +1415,12 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) {
defer sub.Unsubscribe()
checkSubsPending(t, sub, 10)
resp, err = nc.Request(fmt.Sprintf(server.JSApiConsumerInfoT, "TEST", "dlc"), nil, time.Second)
leader = c.consumerLeader("$G", "TEST", "dlc").Name()
ci, err := sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var ci server.ConsumerInfo
if err = json.Unmarshal(resp.Data, &ci); err != nil {
t.Fatalf("Unexpected error: %v", err)
t.Fatalf("Unexpected error getting consumer info: %v", err)
}
leader = c.consumerLeader("$G", "TEST", "dlc").Name()
if ci.Cluster.Leader != leader {
t.Fatalf("Expected leader of %q, got %q", leader, ci.Cluster.Leader)
}

View File

@@ -22,6 +22,7 @@ import (
"net/http"
"strconv"
"strings"
"sync/atomic"
"time"
)
@@ -389,6 +390,7 @@ type ConsumerInfo struct {
NumRedelivered int `json:"num_redelivered"`
NumWaiting int `json:"num_waiting"`
NumPending uint64 `json:"num_pending"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
}
// SequencePair includes the consumer and stream sequence info from a JetStream consumer.
@@ -827,11 +829,16 @@ func (m *Msg) checkReply() (*js, bool, error) {
}
// ackReply handles all acks. Will do the right thing for pull and sync mode.
// It ensures that an ack is only sent a single time, regardless of
// how many times it is being called to avoid duplicated acks.
func (m *Msg) ackReply(ackType []byte, sync bool) error {
js, isPullMode, err := m.checkReply()
if err != nil {
return err
}
if atomic.LoadUint32(&m.ackd) == 1 {
return ErrInvalidJSAck
}
if isPullMode {
if bytes.Equal(ackType, AckAck) {
err = js.nc.PublishRequest(m.Reply, m.Sub.Subject, AckNext)
@@ -846,6 +853,13 @@ func (m *Msg) ackReply(ackType []byte, sync bool) error {
} else {
err = js.nc.Publish(m.Reply, ackType)
}
// Mark that the message has been acked unless it is AckProgress
// which can be sent many times.
if err == nil && !bytes.Equal(ackType, AckProgress) {
atomic.StoreUint32(&m.ackd, 1)
}
return err
}
@@ -871,7 +885,8 @@ func (m *Msg) Term() error {
return m.ackReply(AckTerm, false)
}
// Indicate that this message is being worked on and reset redelkivery timer in the server.
// InProgress indicates that this message is being worked on
// and reset the redelivery timer in the server.
func (m *Msg) InProgress() error {
return m.ackReply(AckProgress, false)
}

View File

@@ -335,6 +335,7 @@ type StreamInfo struct {
Config StreamConfig `json:"config"`
Created time.Time `json:"created"`
State StreamState `json:"state"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
}
// StreamStats is information about the given stream.
@@ -348,6 +349,22 @@ type StreamState struct {
Consumers int `json:"consumer_count"`
}
// ClusterInfo shows information about the underlying set of servers
// that make up the stream or consumer.
type ClusterInfo struct {
Name string `json:"name,omitempty"`
Leader string `json:"leader,omitempty"`
Replicas []*PeerInfo `json:"replicas,omitempty"`
}
// PeerInfo shows information about all the peers in the cluster that
// are supporting the stream or consumer.
type PeerInfo struct {
Name string `json:"name"`
Current bool `json:"current"`
Active time.Duration `json:"active"`
}
// UpdateStream updates a Stream.
func (js *js) UpdateStream(cfg *StreamConfig) (*StreamInfo, error) {
if cfg == nil || cfg.Name == _EMPTY_ {

View File

@@ -553,6 +553,7 @@ type Msg struct {
Sub *Subscription
next *Msg
barrier *barrierInfo
ackd uint32
}
func (m *Msg) headerBytes() ([]byte, error) {

2
vendor/modules.txt vendored
View File

@@ -7,7 +7,7 @@ github.com/minio/highwayhash
# github.com/nats-io/jwt/v2 v2.0.0-20210107222814-18c5cc45d263
## explicit
github.com/nats-io/jwt/v2
# github.com/nats-io/nats.go v1.10.1-0.20210115180731-7fb8bacca613
# github.com/nats-io/nats.go v1.10.1-0.20210122204956-b8ea7fc17ea6
## explicit
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin