mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Fixed issue with route s2_auto when compression is actually off
This is a fix for PR https://github.com/nats-io/nats-server/pull/4001. If a server has an s2_auto configuration, the compression level needs to be updated based on the RTT, however, this should not happen if a particular route is actually not using compression, either because it is a connection to an older server or the other side has explicitly configure compression to be "off". Extended a test that would have caught this issue. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -2389,12 +2389,12 @@ func (c *client) processPong() {
|
||||
c.rtt = computeRTT(c.rttStart)
|
||||
srv := c.srv
|
||||
reorderGWs := c.kind == GATEWAY && c.gw.outbound
|
||||
// For routes, check if we have compression auto and if we should change
|
||||
// the compression level. However, exclude the route if compression is
|
||||
// "not supported", which indicates that this is a route to an older server.
|
||||
if c.kind == ROUTER && c.route.compression != CompressionNotSupported {
|
||||
if opts := srv.getOpts(); opts.Cluster.Compression.Mode == CompressionS2Auto {
|
||||
if cm := selectS2AutoModeBasedOnRTT(c.rtt, opts.Cluster.Compression.RTTThresholds); cm != c.route.compression {
|
||||
// If compression is currently active for a route connection, if the
|
||||
// compression configuration is s2_auto, check if we should change
|
||||
// the compression level.
|
||||
if c.kind == ROUTER && needsCompression(c.route.compression) {
|
||||
if co := &(srv.getOpts().Cluster.Compression); co.Mode == CompressionS2Auto {
|
||||
if cm := selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds); cm != c.route.compression {
|
||||
c.route.compression = cm
|
||||
c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
|
||||
}
|
||||
|
||||
@@ -3637,6 +3637,9 @@ func TestRouteCompressionAuto(t *testing.T) {
|
||||
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
||||
s2.mu.RLock()
|
||||
defer s2.mu.RUnlock()
|
||||
if n := s2.numRoutes(); n != 4 {
|
||||
return fmt.Errorf("Cluster not formed properly, got %v routes", n)
|
||||
}
|
||||
var err error
|
||||
s2.forEachRoute(func(r *client) {
|
||||
if err != nil {
|
||||
@@ -3694,6 +3697,18 @@ func TestRouteCompressionAuto(t *testing.T) {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
np.updateRTT(25 * time.Millisecond)
|
||||
checkComp(CompressionS2Fast)
|
||||
|
||||
// Now disable compression on s1
|
||||
reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl, "A", "10s", "off", _EMPTY_))
|
||||
// Wait a bit to make sure we don't check for cluster too soon since
|
||||
// we expect a disconnect.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
checkClusterFormed(t, s1, s2)
|
||||
// Now change the RTT values in the proxy.
|
||||
np.updateRTT(1 * time.Millisecond)
|
||||
// Now check that s2 also shows as "off". Wait for some ping intervals.
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
checkComp(CompressionOff)
|
||||
}
|
||||
|
||||
func TestRoutePings(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user