This could happen when a consumer had not sent anything to the
attached NATS subscription and there was a consumer leader
step down or server restart.
Signed-off-by: Derek Collison <derek@nats.io>
We were getting a data race checking the js.clustered field in
updateUsage() following fix for lock inversion in PR #3092.
```
=== RUN TestJetStreamClusterKVMultipleConcurrentCreate
==================
WARNING: DATA RACE
Read at 0x00c0009db5d8 by goroutine 195:
github.com/nats-io/nats-server/v2/server.(*jsAccount).updateUsage()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream.go:1681 +0x8f
github.com/nats-io/nats-server/v2/server.(*stream).storeUpdates()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/stream.go:2927 +0x1d9
github.com/nats-io/nats-server/v2/server.(*stream).storeUpdates-fm()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/stream.go:2905 +0x7d
github.com/nats-io/nats-server/v2/server.(*fileStore).removeMsg()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/filestore.go:2158 +0x14f7
github.com/nats-io/nats-server/v2/server.(*fileStore).expireMsgs()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/filestore.go:2777 +0x18f
github.com/nats-io/nats-server/v2/server.(*fileStore).expireMsgs-fm()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/filestore.go:2770 +0x39
Previous write at 0x00c0009db5d8 by goroutine 128:
github.com/nats-io/nats-server/v2/server.(*jetStream).setupMetaGroup()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:604 +0xfae
github.com/nats-io/nats-server/v2/server.(*Server).enableJetStreamClustering()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:514 +0x20a
github.com/nats-io/nats-server/v2/server.(*Server).enableJetStream()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream.go:400 +0x1168
github.com/nats-io/nats-server/v2/server.(*Server).EnableJetStream()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream.go:206 +0x651
github.com/nats-io/nats-server/v2/server.(*Server).Start()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/server.go:1746 +0x1804
github.com/nats-io/nats-server/v2/server.RunServer·dwrap·4269()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/server_test.go:90 +0x39
Goroutine 195 (running) created at:
time.goFunc()
/home/travis/.gimme/versions/go1.17.9.linux.amd64/src/time/sleep.go:180 +0x49
Goroutine 128 (finished) created at:
github.com/nats-io/nats-server/v2/server.RunServer()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/server_test.go:90 +0x278
github.com/nats-io/nats-server/v2/server.RunServerWithConfig()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/server_test.go:112 +0x44
github.com/nats-io/nats-server/v2/server.(*cluster).restartServer()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream_helpers_test.go:1004 +0x1d5
github.com/nats-io/nats-server/v2/server.TestJetStreamClusterKVMultipleConcurrentCreate()
/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream_cluster_test.go:8463 +0x64b
testing.tRunner()
/home/travis/.gimme/versions/go1.17.9.linux.amd64/src/testing/testing.go:1259 +0x22f
testing.(*T).Run·dwrap·21()
/home/travis/.gimme/versions/go1.17.9.linux.amd64/src/testing/testing.go:1306 +0x47
==================
```
Running that test with adding some delay in several places also showed another race:
```
==================
WARNING: DATA RACE
Read at 0x00c00016adb8 by goroutine 160:
github.com/nats-io/nats-server/v2/server.(*fileStore).expireMsgs()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/filestore.go:2777 +0x106
github.com/nats-io/nats-server/v2/server.(*fileStore).expireMsgs-fm()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/filestore.go:2771 +0x39
Previous write at 0x00c00016adb8 by goroutine 32:
github.com/nats-io/nats-server/v2/server.(*fileStore).UpdateConfig()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/filestore.go:360 +0x1c8
github.com/nats-io/nats-server/v2/server.(*stream).update()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/stream.go:1360 +0x852
github.com/nats-io/nats-server/v2/server.(*jetStream).processClusterCreateStream()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:2704 +0x4a4
github.com/nats-io/nats-server/v2/server.(*jetStream).processStreamAssignment()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:2452 +0xad9
github.com/nats-io/nats-server/v2/server.(*jetStream).applyMetaEntries()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:1407 +0x7e4
github.com/nats-io/nats-server/v2/server.(*jetStream).monitorCluster()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:887 +0xc75
github.com/nats-io/nats-server/v2/server.(*jetStream).monitorCluster-fm()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:813 +0x39
Goroutine 160 (running) created at:
time.goFunc()
/usr/local/go/src/time/sleep.go:180 +0x49
Goroutine 32 (running) created at:
github.com/nats-io/nats-server/v2/server.(*Server).startGoRoutine()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:3013 +0x86
github.com/nats-io/nats-server/v2/server.(*jetStream).setupMetaGroup()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:612 +0x1092
github.com/nats-io/nats-server/v2/server.(*Server).enableJetStreamClustering()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:514 +0x20a
github.com/nats-io/nats-server/v2/server.(*Server).enableJetStream()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/jetstream.go:400 +0x1168
github.com/nats-io/nats-server/v2/server.(*Server).EnableJetStream()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/jetstream.go:206 +0x651
github.com/nats-io/nats-server/v2/server.(*Server).Start()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:1746 +0x1804
github.com/nats-io/nats-server/v2/server.RunServer·dwrap·4275()
/Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server_test.go:90 +0x39
==================
```
Both are now addressed, either with proper locking, or with the use of an atomic in the place
where we cannot get the lock (without re-introducing the lock inversion issue).
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
When updating usage, there is a lock inversion in that the jetStream
lock was acquired while under the stream's (mset) lock, which is
not correct. Also, updateUsage was locking the jsAccount lock, which
again, is not really correct since jsAccount contains streams, so
it should be jsAccount->stream, not the other way around.
Removed the locking of jetStream to check for clustered state since
js.clustered is immutable.
Replaced using jsAccount lock to update usage with a dedicated lock.
Originally moved all the update/limit fields in jsAccount to new
structure to make sure that I would see all code that is updating
or reading those fields, and also all functions so that I could
make sure that I use the new lock when calling these. Once that
works was done, and to reduce code changes, I put the fields back
into jsAccount (although I grouped them under the new usageMu mutex
field).
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
When deciding to compact a file, we need to remove from the raw
bytes the empty records, otherwise, for small messages, we would
end-up calling compact() too many times.
When removing a message from the stream, in FIFO cases we would
write the index every 2 seconds at most when doing it in place,
when when dealing with out of order deletes, we would do it for
every single delete, which can be costly. We are now writing
only every 500ms for non FIFO cases.
Also fixed some unrelated code:
- Decision to install a snapshot was based on incorrect logical
expression
- In checkPending(), protect against the timer being nil which
could happen when consumer is stopped or leadership change.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Step down timing for consumers or streams.
Signals loss of leadership and sleeps before stepping down.
This makes it less likely that messages are being processed during step
down.
When becoming leader, consumer stream seqno got reset,
even though the consumer existed already.
Proper cleanup of redelivery data structures and timer
Signed-off-by: Matthias Hanel <mh@synadia.com>
This is a continuation of PR #3060, but extends to clustering.
Verified with manual test that a mirror created with v2.7.4 has
the duplicates window set and on restart with main would still
complain about use of dedup in cluster mode. The mirror stream
was recovered but showing as R1.
With this fix, a restart of the cluster - with existing data -
will properly recover the stream as an R3 and messages that
were published while in a bad state are synchronized.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Signed-off-by: Matthias Hanel mh@synadia.com
* [Fixed] limits enforcement issues
stream create had checks that stream restore did not have.
Moved code into commonly used function checkStreamCfg.
Also introduced (cluster/non clustered) StreamLimitsCheck functions to
perform checks specific to clustered /non clustered data structures.
Checking for valid stream config and limits/reservations before
receiving all the data. Now fails the request right away.
Added a jetstream limit "max_request_batch" to limit fetch batch size
Shortened max name length from 256 to 255, more common file name limit
Added check for loop in cyclic source stream configurations
features related to limits
Signed-off-by: Matthias Hanel <mh@synadia.com>
During elected stepdown and transfer allow the new leader to take over before we stepdown.
We could receive a leader change, so make sure to also check migration state.
Signed-off-by: Derek Collison <derek@nats.io>
* [Adding] max_ha_assets to limit placement on server with more ha assets
server running more than max_ha_assets #raft nodes will not be used to
place new streams and fail if not enough free server can be found.
Durable Consumer creation on such server will fail as their peer size is
bound to the same set as their stream.
This also avoids updating placement where no new placement is needed.
This is the case when, on update, placement tags get removed.
Signed-off-by: Matthias Hanel <mh@synadia.com>
- A stream could become leader when it should not, causing
messages to be lost.
- A catchup could stall because the server sending data
could bail out of the runCatchup routine but still send
the EOF signal.
- Deadlock with monitoring of Jsz
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Signed-off-by: Derek Collison <derek@nats.io>
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
The system will allow an update to a stream, and subsequently all attached consumers, to be placed in another cluster either directly or via tag placement.
The meta layer will scale the underlying peerset appropriately to straddle the two clusters for both the stream and consumers, taking into account the consumer type.
Control will then pass to the current leaders of the assets who will monitor the catchup status of the new peers.
(Note we can optimize this later to only traverse once across a GW for any given asset, but for now this is simpler)
Once the original leaders have determined the assets are synched it will pass leadership to a member of the new peerset.
Once the new leader has been elected, it will forward a request for the meta layer to shrink the peerset by removing the old peers.
Signed-off-by: Derek Collison <derek@nats.io>
Some warnings, especially when dealing with JS limits that were
printed on a per-message basis, are now limited to ~1 per second
if the content of the warning is already found in a map.
This is also for "client" warnings, but the client porting of the
warning is not taken into account so that helps with reducing logging
for similar content, but coming from different clients.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
adds unit test to test this scenario
improves reporting of correct error
only show info for non existing tiers where streams exist
Signed-off-by: Matthias Hanel <mh@synadia.com>
* added max_ack_penind setting to js account limits
because of the addition, defaults now have to be set later (depend on
these new limits now)
also re-organized the code to closer track how stream create looks
Signed-off-by: Matthias Hanel <mh@synadia.com>
Also fixed a bug that could cause memory based replicated consumers to no longer work after snapshots and server restarts.
The snapshot logic would allow non-state changing updates to continously grow the raft logs. We also were too conservative on when we snapshotted and why.
Also added in ability to have FileStore.Compact() reclaim space from the block file from the head of last changed block.
Signed-off-by: Derek Collison <derek@nats.io>
* Adding server limits (max ack pending/dedupe window) to js config
Also shifting consumer config check to jsConsumerCreate as in clustered
mode this was enforced in the wrong place
Signed-off-by: Matthias Hanel <mh@synadia.com>
Also fixed a bug where we were incorrectly not spining up the monitoring loop for a stream when going from 3->1->3.
Signed-off-by: Derek Collison <derek@nats.io>
Previously we would rely more heavily on Go's garbage collector since when we loaded a block for an underlying stream we would pass references upward to avoimd copies.
Now we always copy when passing back to the upper layers which allows us to not only expire our cache blocks but pool and reuse them.
The upper layers also had changes made to allow the pooling layer at that level to interoperate with the storage layer optionally.
Also fixed some flappers and a bug where de-dupe might not be reformed correctly.
Signed-off-by: Derek Collison <derek@nats.io>
I got this panic in a test:
```
=== RUN TestJetStreamClusterAccountLoadFailure
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x78 pc=0xb1501b]
goroutine 47853 [running]:
github.com/nats-io/nats-server/v2/server.(*jetStream).processLeaderChange(0xc000b60580, 0x0)
/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:3638 +0x9b
github.com/nats-io/nats-server/v2/server.(*jetStream).monitorCluster(0xc000b60580)
/home/travis/gopath/src/github.com/nats-io/nats-server/server/jetstream_cluster.go:853 +0x60f
created by github.com/nats-io/nats-server/v2/server.(*Server).startGoRoutine
/home/travis/gopath/src/github.com/nats-io/nats-server/server/server.go:3017 +0x87
FAIL github.com/nats-io/nats-server/v2/server 227.888s
```
which from that branch would point to function processLeaderChange()
line:
```
} else if node := js.getMetaGroup().GroupLeader(); node == _EMPTY_ {
```
which I guess meant that getMetaGroup() was returning `nil`.
Refactored a bit to get the group leader in 2 steps.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Removed the warnings, instead have a sync.Map where they are
registered/unregistered and can be inspected with an undocumented
monitor page.
Added the notion of "in progress" which is the number of messages
that have beend pop()'ed. When recycle() is invoked this count
goes down.
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>