Bug fix for original replay with no initial interest, README updates

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2019-12-05 14:26:39 -08:00
parent 0aa80dc79c
commit ca5b58e27f
3 changed files with 98 additions and 2 deletions

View File

@@ -339,7 +339,7 @@ Listening on [d.p2]
[#5] Received on [5]: 'hello'
```
Creating the interest triggers delivery of the messages. Something to note here that is new with JetStream, the subscription is just a regular subscription on `d.p2` however the messages are delivered with the original subject. Also note that this observable was not affected bu the other observable we created early. Running this command will show now messages since we create the oversable above with the ack none policy, which means once the message is delivered it is considered ack'd. Use `nats-pub` to send more messages, remember the message set's interest will match any single token subject. So if in a different tab or window you do the following you will see it immediately delivered.
Creating the interest triggers delivery of the messages. Something to note here that is new with JetStream, the subscription is just a regular subscription on `d.p2` however the messages are delivered with the original subject. Also note that this observable was not affected by the other observable we created early. Running the `nats-sub` command again will show no messages since we created the oversable above with the ack none policy. This means once the message is delivered it is considered ack'd. Use `nats-pub` to send more messages. Remember the message set's interest will match any single token subject. So if in a different tab or window you do the following you will see it immediately delivered to the subscriber.
```bash
> nats-pub foo "hello jetsream"
@@ -350,6 +350,33 @@ Listening on [d.p2]
[#1] Received on [foo]: 'hello world'
```
Now lets create another observable, very similar to the one above, but this time ask the system to replay the messages at the same rate they were originally published.
```bash
> jsm add-obs
Enter the following information
Message Set Name: wc
Durable Name: p3
Push or Pull: push
Delivery Subject: d.p3
Deliver All? (Y|n):
AckPolicy (None|all|explicit):
Replay Policy (Instant|original): original
Received response of "+OK"
```
Now when we create our subscriber, the messages will be delivered at the same interval they were received.
```bash
> nats-sub -t d.p3
Listening on [d.p3]
2019/12/05 14:25:23 [#1] Received on [1]: 'hello'
2019/12/05 14:25:27 [#2] Received on [2]: 'hello'
2019/12/05 14:25:29 [#3] Received on [3]: 'hello'
2019/12/05 14:25:31 [#4] Received on [4]: 'hello'
2019/12/05 14:25:34 [#5] Received on [4]: 'hello'
```
## Next Steps
There is plenty to more to discuss and features to describe. We will continue to add things here and feel free to post any questions on the JetStream Slack channel. For the brave, take a look at `nats-server/test/jetstream_test.go` for all that jetstream can do. And please file and issues or communicate on slack or on email.

View File

@@ -682,6 +682,14 @@ func (o *Observable) processReplay() error {
return fmt.Errorf("observable not valid")
}
// If push mode but we have no interest wait for it to show up.
if o.isPushMode() && !o.active {
// We will wait here for new messages to arrive.
o.mu.Unlock()
mset.waitForMsgs()
continue
}
subj, msg, ts, err := o.mset.store.LoadMsg(o.sseq)
if err != nil && err != ErrStoreMsgNotFound {
o.mu.Unlock()

View File

@@ -1411,7 +1411,7 @@ func TestJetStreamObservableReconnect(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// For test speed. Thresh is high to avoid us being deleted.
// For test speed. Thresh is too high to avoid us being deleted.
o.SetActiveCheckParams(50*time.Millisecond, 100)
if !o.Active() {
@@ -2160,6 +2160,67 @@ func TestJetStreamObservableReplayRate(t *testing.T) {
}
}
func TestJetStreamObservableReplayRateNoAck(t *testing.T) {
cases := []struct {
name string
mconfig *server.MsgSetConfig
}{
{"MemoryStore", &server.MsgSetConfig{Name: "DC", Storage: server.MemoryStorage}},
{"FileStore", &server.MsgSetConfig{Name: "DC", Storage: server.FileStorage}},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
mset, err := s.GlobalAccount().AddMsgSet(c.mconfig)
if err != nil {
t.Fatalf("Unexpected error adding message set: %v", err)
}
defer mset.Delete()
nc := clientConnectToServer(t, s)
defer nc.Close()
// Send 10 msgs
totalMsgs := 10
for i := 0; i < totalMsgs; i++ {
nc.Request("DC", []byte("Hello World"), time.Second)
time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
}
if stats := mset.Stats(); stats.Msgs != uint64(totalMsgs) {
t.Fatalf("Expected %d messages, got %d", totalMsgs, stats.Msgs)
}
subj := "d.dc"
o, err := mset.AddObservable(&server.ObservableConfig{
Durable: "derek",
Delivery: subj,
DeliverAll: true,
AckPolicy: server.AckNone,
ReplayPolicy: server.ReplayOriginal,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer o.Delete()
o.SetActiveCheckParams(50*time.Millisecond, 100)
// Sleep a random amount of time.
time.Sleep(time.Duration(rand.Intn(20)) * time.Millisecond)
sub, _ := nc.SubscribeSync(subj)
nc.Flush()
checkFor(t, time.Second, 25*time.Millisecond, func() error {
if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != totalMsgs {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, totalMsgs)
}
return nil
})
})
}
}
func TestJetStreamObservableReplayQuit(t *testing.T) {
cases := []struct {
name string