mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
165 lines
3.6 KiB
Go
165 lines
3.6 KiB
Go
// Copyright 2016-2020 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/nats-io/nats-server/v2/server"
|
|
"golang.org/x/time/rate"
|
|
)
|
|
|
|
func checkFor(t *testing.T, totalWait, sleepDur time.Duration, f func() error) {
|
|
t.Helper()
|
|
timeout := time.Now().Add(totalWait)
|
|
var err error
|
|
for time.Now().Before(timeout) {
|
|
err = f()
|
|
if err == nil {
|
|
return
|
|
}
|
|
time.Sleep(sleepDur)
|
|
}
|
|
if err != nil {
|
|
t.Fatal(err.Error())
|
|
}
|
|
}
|
|
|
|
// Slow Proxy - For introducing RTT and BW constraints.
|
|
type slowProxy struct {
|
|
listener net.Listener
|
|
conns []net.Conn
|
|
o *server.Options
|
|
u string
|
|
}
|
|
|
|
func newSlowProxy(rtt time.Duration, up, down int, opts *server.Options) *slowProxy {
|
|
saddr := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
|
|
hp := net.JoinHostPort("127.0.0.1", "0")
|
|
l, e := net.Listen("tcp", hp)
|
|
if e != nil {
|
|
panic(fmt.Sprintf("Error listening on port: %s, %q", hp, e))
|
|
}
|
|
port := l.Addr().(*net.TCPAddr).Port
|
|
sp := &slowProxy{listener: l}
|
|
go func() {
|
|
client, err := l.Accept()
|
|
if err != nil {
|
|
return
|
|
}
|
|
server, err := net.DialTimeout("tcp", saddr, time.Second)
|
|
if err != nil {
|
|
panic("Can't connect to server")
|
|
}
|
|
sp.conns = append(sp.conns, client, server)
|
|
go sp.loop(rtt, up, client, server)
|
|
go sp.loop(rtt, down, server, client)
|
|
}()
|
|
sp.o = &server.Options{Host: "127.0.0.1", Port: port}
|
|
sp.u = fmt.Sprintf("nats://%s:%d", sp.o.Host, sp.o.Port)
|
|
return sp
|
|
}
|
|
|
|
func (sp *slowProxy) opts() *server.Options {
|
|
return sp.o
|
|
}
|
|
|
|
//lint:ignore U1000 Referenced in norace_test.go
|
|
func (sp *slowProxy) clientURL() string {
|
|
return sp.u
|
|
}
|
|
|
|
func (sp *slowProxy) loop(rtt time.Duration, tbw int, r, w net.Conn) {
|
|
delay := rtt / 2
|
|
const rbl = 1024
|
|
var buf [rbl]byte
|
|
ctx := context.Background()
|
|
|
|
rl := rate.NewLimiter(rate.Limit(tbw), rbl)
|
|
|
|
for fr := true; ; {
|
|
sr := time.Now()
|
|
n, err := r.Read(buf[:])
|
|
if err != nil {
|
|
return
|
|
}
|
|
// RTT delays
|
|
if fr || time.Since(sr) > 2*time.Millisecond {
|
|
fr = false
|
|
time.Sleep(delay)
|
|
}
|
|
if err := rl.WaitN(ctx, n); err != nil {
|
|
return
|
|
}
|
|
if _, err = w.Write(buf[:n]); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sp *slowProxy) stop() {
|
|
if sp.listener != nil {
|
|
sp.listener.Close()
|
|
sp.listener = nil
|
|
for _, c := range sp.conns {
|
|
c.Close()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Dummy Logger
|
|
type dummyLogger struct {
|
|
sync.Mutex
|
|
msg string
|
|
}
|
|
|
|
func (d *dummyLogger) Fatalf(format string, args ...interface{}) {
|
|
d.Lock()
|
|
d.msg = fmt.Sprintf(format, args...)
|
|
d.Unlock()
|
|
}
|
|
|
|
func (d *dummyLogger) Errorf(format string, args ...interface{}) {
|
|
}
|
|
|
|
func (d *dummyLogger) Debugf(format string, args ...interface{}) {
|
|
}
|
|
|
|
func (d *dummyLogger) Tracef(format string, args ...interface{}) {
|
|
}
|
|
|
|
func (d *dummyLogger) Noticef(format string, args ...interface{}) {
|
|
}
|
|
|
|
func (d *dummyLogger) Warnf(format string, args ...interface{}) {
|
|
}
|
|
|
|
func TestStackFatal(t *testing.T) {
|
|
d := &dummyLogger{}
|
|
stackFatalf(d, "test stack %d", 1)
|
|
if !strings.HasPrefix(d.msg, "test stack 1") {
|
|
t.Fatalf("Unexpected start of stack: %v", d.msg)
|
|
}
|
|
if !strings.Contains(d.msg, "test_test.go") {
|
|
t.Fatalf("Unexpected stack: %v", d.msg)
|
|
}
|
|
}
|