diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 23fa5311..16e9cebf 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -344,7 +344,7 @@ func TestJetStreamBasicDelivery(t *testing.T) { // Now let's check the messages for i := 0; i < toSend; i++ { - m, _ := sub.NextMsg(time.Millisecond) + m, _ := sub.NextMsg(time.Second) // JetStream will have the subject match the stream subject, not delivery subject. if m.Subject != sendSubj { t.Fatalf("Expected original subject of %q, but got %q", sendSubj, m.Subject) @@ -387,13 +387,10 @@ func TestJetStreamBasicDelivery(t *testing.T) { } defer o.Delete() - checkFor(t, 250*time.Millisecond, 10*time.Millisecond, func() error { - if nmsgs, _, _ := sub.Pending(); err != nil || nmsgs != 1 { - return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, toSend) - } - return nil - }) - m, _ := sub.NextMsg(time.Millisecond) + m, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Did not get expected message, got %v", err) + } if seq := o.SeqFromReply(m.Reply); seq != 200 { t.Fatalf("Expected sequence to be 200, but got %d", seq) } @@ -407,7 +404,8 @@ func TestJetStreamBasicDelivery(t *testing.T) { } defer o.Delete() - if m, err := sub.NextMsg(time.Millisecond); err == nil { + // Make sure we only got one message. + if m, err := sub.NextMsg(5 * time.Millisecond); err == nil { t.Fatalf("Expected no msg, got %+v", m) } @@ -465,11 +463,6 @@ func TestJetStreamBasicWorkQueue(t *testing.T) { // For normal work queue semantics, you send requests to the subject with message set and observable name. reqMsgSubj := fmt.Sprintf("%s.%s.%s", server.JsReqPre, mname, oname) - // FIXME(dlc) - Right now this will *not* work with new style nc.Request(). - // The new Request() mux in client needs the original subject to de-mux. Will panic. - // Working on a fix, but for now revert back to old style. - nc.Opts.UseOldRequestStyle = true - getNext := func(seqno int) { t.Helper() nextMsg, err := nc.Request(reqMsgSubj, nil, time.Second) @@ -491,7 +484,7 @@ func TestJetStreamBasicWorkQueue(t *testing.T) { // Now we want to make sure we can get a message that is published to the message // set as we are waiting for it. - nextDelay := 100 * time.Millisecond + nextDelay := 50 * time.Millisecond go func() { time.Sleep(nextDelay) @@ -509,7 +502,7 @@ func TestJetStreamBasicWorkQueue(t *testing.T) { time.Sleep(nextDelay) for i := 0; i < toSend; i++ { nc.Request(sendSubj, []byte("Hello World!"), 50*time.Millisecond) - time.Sleep(5 * time.Millisecond) + time.Sleep(time.Millisecond) } }() @@ -643,7 +636,7 @@ func TestJetStreamPartitioning(t *testing.T) { // Now let's check the messages for i := 1; i <= toSend; i++ { - m, err := sub.NextMsg(time.Millisecond) + m, err := sub.NextMsg(time.Second) if err != nil { t.Fatalf("Unexpected error: %v", err) }