We were getting a data race checking the js.clustered field in
updateUsage() following fix for lock inversion in PR #3092.
```
=== RUN TestJetStreamClusterKVMultipleConcurrentCreate
==================
WARNING: DATA RACE
Read at 0x00c0009db5d8 by goroutine 195:
github.com/nats-io/nats-server/v2/server.(*jsAccount).updateUsage()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream.go:1681 +0x8f
github.com/nats-io/nats-server/v2/server.(*stream).storeUpdates()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/stream.go:2927 +0x1d9
github.com/nats-io/nats-server/v2/server.(*stream).storeUpdates-fm()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/stream.go:2905 +0x7d
github.com/nats-io/nats-server/v2/server.(*fileStore).removeMsg()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/filestore.go:2158 +0x14f7
github.com/nats-io/nats-server/v2/server.(*fileStore).expireMsgs()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/filestore.go:2777 +0x18f
github.com/nats-io/nats-server/v2/server.(*fileStore).expireMsgs-fm()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/filestore.go:2770 +0x39
Previous write at 0x00c0009db5d8 by goroutine 128:
github.com/nats-io/nats-server/v2/server.(*jetStream).setupMetaGroup()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:604 +0xfae
github.com/nats-io/nats-server/v2/server.(*Server).enableJetStreamClustering()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:514 +0x20a
github.com/nats-io/nats-server/v2/server.(*Server).enableJetStream()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream.go:400 +0x1168
github.com/nats-io/nats-server/v2/server.(*Server).EnableJetStream()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream.go:206 +0x651
github.com/nats-io/nats-server/v2/server.(*Server).Start()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/server.go:1746 +0x1804
github.com/nats-io/nats-server/v2/server.RunServer·dwrap·4269()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/server_test.go:90 +0x39
Goroutine 195 (running) created at:
time.goFunc()
/home/travis/.gimme/versions/go1.17.9.linux.amd64/src/time/sleep.go:180 +0x49
Goroutine 128 (finished) created at:
github.com/nats-io/nats-server/v2/server.RunServer()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/server_test.go:90 +0x278
github.com/nats-io/nats-server/v2/server.RunServerWithConfig()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/server_test.go:112 +0x44
github.com/nats-io/nats-server/v2/server.(*cluster).restartServer()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream_helpers_test.go:1004 +0x1d5
github.com/nats-io/nats-server/v2/server.TestJetStreamClusterKVMultipleConcurrentCreate()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream_cluster_test.go:8463 +0x64b
testing.tRunner()
/home/travis/.gimme/versions/go1.17.9.linux.amd64/src/testing/testing.go:1259 +0x22f
testing.(*T).Run·dwrap·21()
/home/travis/.gimme/versions/go1.17.9.linux.amd64/src/testing/testing.go:1306 +0x47
==================
```
Running that test with adding some delay in several places also showed another race:
```
==================
WARNING: DATA RACE
Read at 0x00c00016adb8 by goroutine 160:
github.com/nats-io/nats-server/v2/server.(*fileStore).expireMsgs()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/filestore.go:2777 +0x106
github.com/nats-io/nats-server/v2/server.(*fileStore).expireMsgs-fm()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/filestore.go:2771 +0x39
Previous write at 0x00c00016adb8 by goroutine 32:
github.com/nats-io/nats-server/v2/server.(*fileStore).UpdateConfig()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/filestore.go:360 +0x1c8
github.com/nats-io/nats-server/v2/server.(*stream).update()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/stream.go:1360 +0x852
github.com/nats-io/nats-server/v2/server.(*jetStream).processClusterCreateStream()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:2704 +0x4a4
github.com/nats-io/nats-server/v2/server.(*jetStream).processStreamAssignment()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:2452 +0xad9
github.com/nats-io/nats-server/v2/server.(*jetStream).applyMetaEntries()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:1407 +0x7e4
github.com/nats-io/nats-server/v2/server.(*jetStream).monitorCluster()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:887 +0xc75
github.com/nats-io/nats-server/v2/server.(*jetStream).monitorCluster-fm()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:813 +0x39
Goroutine 160 (running) created at:
time.goFunc()
/usr/local/go/src/time/sleep.go:180 +0x49
Goroutine 32 (running) created at:
github.com/nats-io/nats-server/v2/server.(*Server).startGoRoutine()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:3013 +0x86
github.com/nats-io/nats-server/v2/server.(*jetStream).setupMetaGroup()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:612 +0x1092
github.com/nats-io/nats-server/v2/server.(*Server).enableJetStreamClustering()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:514 +0x20a
github.com/nats-io/nats-server/v2/server.(*Server).enableJetStream()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/jetstream.go:400 +0x1168
github.com/nats-io/nats-server/v2/server.(*Server).EnableJetStream()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/jetstream.go:206 +0x651
github.com/nats-io/nats-server/v2/server.(*Server).Start()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:1746 +0x1804
github.com/nats-io/nats-server/v2/server.RunServer·dwrap·4275()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server_test.go:90 +0x39
==================
```
Both are now addressed, either with proper locking, or with the use of an atomic in the place
where we cannot get the lock (without re-introducing the lock inversion issue).
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Before PR #3099, `waitQueue.isEmpty()` returned `wq.len() == 0`
and `waitQueue.len()` was protecting against the pointer being
nil (and then return 0).
The change in #3099 caused `waitQueue.isEmpty()` to return `wq.n == 0`,
which means that if `wq` was nil, then it would crash.
This PR restores `waitQueue.isEmpty()` to return `wq.len() == 0` and
add the protection for waitQueue being nil in `len()` similar to
how it was prior to PR #3099.
Resolves#3117
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This would happen in situation where a node receives an append
entry with a term higher than the node's (current leader).
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
I tracked down this issue to have been introduced with PR #2369,
but the code also touched PR #1891 and PR #3088.
I added a test as described in issue #3108 but did not need
JetStream to demonstrate the issue. With the proposed fix, all
tests that were added in aforementioned PRs still pass, including
the new test.
Resolves#3108
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
When a downstream stream uses retention modes that delete messages, fallback to timebased start time for the new source consumers.
Signed-off-by: Derek Collison <derek@nats.io>
There was a case where we may have done a check for max-per-subject
limit twice per message. That would apply to streams that have
max-per-subject and also discard_new, which is what KV configures.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
There was an issue with MaxWaiting==1 that was causing a request
with expiration to actually not expire. This was because processWaiting
would not pick it up because wq.rp was actually equal to wq.wp
(that is, the read pointer was equal to write pointer for a slice
of capacity of 1).
The other issue was that when reaching the maximum of waiting pull
requests, a new request would evict an old one with a "408 Request Canceled".
There is no reason for that, instead the server will first try to
find some existing expired requests (since some of the expiration
is lazily done), but if none is expired, and the queue is full,
the server will return a "409 Exceeded MaxWaiting" to the new
request, and not a "408 Request Canceled" to an old one...
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
When updating usage, there is a lock inversion in that the jetStream
lock was acquired while under the stream's (mset) lock, which is
not correct. Also, updateUsage was locking the jsAccount lock, which
again, is not really correct since jsAccount contains streams, so
it should be jsAccount->stream, not the other way around.
Removed the locking of jetStream to check for clustered state since
js.clustered is immutable.
Replaced using jsAccount lock to update usage with a dedicated lock.
Originally moved all the update/limit fields in jsAccount to new
structure to make sure that I would see all code that is updating
or reading those fields, and also all functions so that I could
make sure that I use the new lock when calling these. Once that
works was done, and to reduce code changes, I put the fields back
into jsAccount (although I grouped them under the new usageMu mutex
field).
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
When deciding to compact a file, we need to remove from the raw
bytes the empty records, otherwise, for small messages, we would
end-up calling compact() too many times.
When removing a message from the stream, in FIFO cases we would
write the index every 2 seconds at most when doing it in place,
when when dealing with out of order deletes, we would do it for
every single delete, which can be costly. We are now writing
only every 500ms for non FIFO cases.
Also fixed some unrelated code:
- Decision to install a snapshot was based on incorrect logical
expression
- In checkPending(), protect against the timer being nil which
could happen when consumer is stopped or leadership change.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Step down timing for consumers or streams.
Signals loss of leadership and sleeps before stepping down.
This makes it less likely that messages are being processed during step
down.
When becoming leader, consumer stream seqno got reset,
even though the consumer existed already.
Proper cleanup of redelivery data structures and timer
Signed-off-by: Matthias Hanel <mh@synadia.com>
- Remove code coverage from Travis and add it to a GitHub Action
that will be run as a nightly.
- Use tag builds to exclude some tests, such as the "norace" or
JS tests. Since "go test" does not support "negative" regexs, there
is no other way.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This is a continuation of PR #3060, but extends to clustering.
Verified with manual test that a mirror created with v2.7.4 has
the duplicates window set and on restart with main would still
complain about use of dedup in cluster mode. The mirror stream
was recovered but showing as R1.
With this fix, a restart of the cluster - with existing data -
will properly recover the stream as an R3 and messages that
were published while in a bad state are synchronized.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Signed-off-by: Matthias Hanel mh@synadia.com