Package io.micronaut.http.body.stream
Class UpstreamBalancer
java.lang.Object
io.micronaut.http.body.stream.UpstreamBalancer
This class merges the backpressure of two data streams. The bytes signaled to the
upstream
is always the minimum of the consumed bytes of the two downstreams.
Implementation notes: This is a bit tricky to implement without locking due to the concurrent nature of
BufferConsumer.Upstream
. Let l
and r
be the total bytes consumed by the left and
right downstreams respectively. We have signalled already the consumption of
min(l, r)
bytes upstream. The AtomicLong
stores the difference l-r
.
Now, assume the left downstream (wlog) signals consumption of n
further bytes. There
are three cases:
l>r
, thusl-r>0
: right downstream is already lagging behind, don't send any demand upstreaml<r
, andl+n<r
: left downstream stays lagging behind, send the fulln
demand upstreaml<r
, butl+n>r
: left downstream was lagging behind but now right downstream is. Just sendr-l=abs(l-r)
upstream
The last two cases can be combined into sending min(n, abs(l-r))
upstream. So we
only need to test for the first case.
-
Nested Class Summary
-
Method Summary
Modifier and TypeMethodDescriptionbalancer
(BufferConsumer.Upstream upstream, ByteBody.SplitBackpressureMode mode) Create a pair ofBufferConsumer.Upstream
instances that delegates to the givenupstream
according to the semantics of the givenmode
.fastest
(BufferConsumer.Upstream upstream) Implementation ofByteBody.SplitBackpressureMode.FASTEST
.first
(BufferConsumer.Upstream upstream) Implementation ofByteBody.SplitBackpressureMode.ORIGINAL
andByteBody.SplitBackpressureMode.NEW
.slowest
(BufferConsumer.Upstream upstream) Implementation ofByteBody.SplitBackpressureMode.SLOWEST
.
-
Method Details
-
slowest
Implementation ofByteBody.SplitBackpressureMode.SLOWEST
.- Parameters:
upstream
- The original upstream- Returns:
- The balanced upstreams
-
fastest
Implementation ofByteBody.SplitBackpressureMode.FASTEST
.- Parameters:
upstream
- The original upstream- Returns:
- The balanced upstreams
-
first
Implementation ofByteBody.SplitBackpressureMode.ORIGINAL
andByteBody.SplitBackpressureMode.NEW
.- Parameters:
upstream
- The original upstream- Returns:
- The balanced upstreams
-
balancer
public static UpstreamBalancer.UpstreamPair balancer(BufferConsumer.Upstream upstream, ByteBody.SplitBackpressureMode mode) Create a pair ofBufferConsumer.Upstream
instances that delegates to the givenupstream
according to the semantics of the givenmode
.- Parameters:
upstream
- The original upstreammode
- The balancing mode- Returns:
- The balanced upstreams
-