Simplify FlowMap write path with synchronized + @Volatile#7
Merged
Conversation
Drop the per-subscriber TreeMap reordering (orderedSignal) by serializing writes through a single synchronized block. The previous design CAS'd the state version then tryEmit'd asynchronously, so concurrent writers could emit out of order; each subscriber then rebuffered via a TreeMap to restore version order. With writes serialized at the producer, the whole reordering layer disappears (~40 lines removed) and most paths get faster. Also add benchmarks that exercise real mutations (putChanging, putCycling, putAllLarge, putChangingWithSubscribers) and contended writes (concurrentPutCycling @threads(4)). The existing putSingle and putWithSubscribers put the same key/value repeatedly, so they short-circuit after iteration 1 and measure the no-op fast path, not the put. Throughput deltas (before -> after): asFlowWithStateCollection +63% (subscriber initial collection) asFlowCollection +16% putAllSmall +14% concurrentPutCycling (4 threads) +12% putWithSubscribers, putSingle +3% putAllLarge, getFromLarge ~flat putChanging -9% putAndRemove -14% putChangingWithSubscribers(1) -19% putCycling (256 keys, single) -39% Subscriber and collection paths see large gains. Single-threaded mutating writes regress because each put now pays both a synchronized monitor and a separate MutableStateFlow.value setter (its own internal CAS). A follow-up replacing MutableStateFlow with @volatile var (valueFlow derived from signal) should reclaim that.
With writes already serialized through the synchronized block, the
MutableStateFlow was redundant - every put paid for a monitor
acquire/release AND the StateFlow's internal CAS-based setter. Removing
the second atomic op reclaims the write-path regressions the previous
commit introduced.
valueFlow now derives from `signal` instead of `state`. Same per-key
semantics; the one behavioural difference is that slow valueFlow
consumers no longer benefit from StateFlow's implicit conflation, which
matches the documented contract ("Events can be conflated with
[conflate]" - opt-in, not implicit).
Throughput deltas vs the previous commit (ops/ms):
putCycling +87% (was -39% vs original)
putChangingWithSubscribers(1) +28%
putChanging +24% (was -9% vs original)
putAndRemove +19% (was -14% vs original)
concurrentPutCycling +17%
putChangingWithSubscribers(100) +8%
most others ~flat or small wins
asFlowCollection -9% (still +5% vs original)
asFlowWithStateCollection -28% (still +17% vs original)
Write paths are now clear wins instead of the trade-off they were after
synchronized alone. Subscriber-collection gives back some of the gain
but stays ahead of the original.
The existing putChangingWithSubscribers measures writer-only throughput: events go into the MutableSharedFlow buffer (Int.MAX_VALUE capacity) and the writer returns without waiting for subscribers to consume. That hides the real fan-out cost and produces a misleading non-monotonic curve where n=10 looks slower than n=100 - the writer is paying full O(N) wake-up cost at n=10 but mostly just buffering at n=100. putChangingWithDrainingSubscribers blocks after each put until every subscriber has actually run its onEach. End-to-end results are monotonic in subscriber count, as expected: n=1 | 278 ops/ms (vs writer-only 3819 - 14x ahead) n=10 | 60 ops/ms (vs writer-only 62 - same; subscribers keep up) n=100 | 6.2 ops/ms (vs writer-only 566 - 91x ahead) Subscribers signal via an AtomicLong counter that the writer spin-waits on. A blocking Phaser/CountDownLatch on the subscriber side would deadlock with 100 subscribers on Dispatchers.Default (~12 threads) - workers would park before all parties could arrive. Class-level KDoc table refreshed with this run's numbers, including the new benchmark.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Serialize FlowMap writes through a
synchronizedblock (drops the per-subscriberTreeMapreordering layer) and back the state with a@Volatile varinstead ofMutableStateFlow(one atomic op per write instead of two).Throughput vs
main:Small wins elsewhere; no regressions outside noise. End-to-end drained throughput is unchanged (dispatcher-limited).