1+ /*
2+ * Copyright 2016-2020 David Karnok
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package hu .akarnokd .rxjava3 .interop ;
18+
19+ import java .util .concurrent .atomic .AtomicLong ;
20+
21+ import io .reactivex .rxjava3 .annotations .NonNull ;
22+ import io .reactivex .rxjava3 .plugins .RxJavaPlugins ;
23+
24+ /**
25+ * Utility class to help with backpressure-related operations such as request aggregation.
26+ */
27+ final class BackpressureHelper {
28+ /** Utility class. */
29+ private BackpressureHelper () {
30+ throw new IllegalStateException ("No instances!" );
31+ }
32+
33+ /**
34+ * Adds two long values and caps the sum at {@link Long#MAX_VALUE}.
35+ * @param a the first value
36+ * @param b the second value
37+ * @return the sum capped at {@link Long#MAX_VALUE}
38+ */
39+ public static long addCap (long a , long b ) {
40+ long u = a + b ;
41+ if (u < 0L ) {
42+ return Long .MAX_VALUE ;
43+ }
44+ return u ;
45+ }
46+
47+ /**
48+ * Multiplies two long values and caps the product at {@link Long#MAX_VALUE}.
49+ * @param a the first value
50+ * @param b the second value
51+ * @return the product capped at {@link Long#MAX_VALUE}
52+ */
53+ public static long multiplyCap (long a , long b ) {
54+ long u = a * b ;
55+ if (((a | b ) >>> 31 ) != 0 ) {
56+ if (u / a != b ) {
57+ return Long .MAX_VALUE ;
58+ }
59+ }
60+ return u ;
61+ }
62+
63+ /**
64+ * Atomically adds the positive value n to the requested value in the {@link AtomicLong} and
65+ * caps the result at {@link Long#MAX_VALUE} and returns the previous value.
66+ * @param requested the {@code AtomicLong} holding the current requested value
67+ * @param n the value to add, must be positive (not verified)
68+ * @return the original value before the add
69+ */
70+ public static long add (@ NonNull AtomicLong requested , long n ) {
71+ for (;;) {
72+ long r = requested .get ();
73+ if (r == Long .MAX_VALUE ) {
74+ return Long .MAX_VALUE ;
75+ }
76+ long u = addCap (r , n );
77+ if (requested .compareAndSet (r , u )) {
78+ return r ;
79+ }
80+ }
81+ }
82+
83+ /**
84+ * Atomically adds the positive value n to the requested value in the {@link AtomicLong} and
85+ * caps the result at {@link Long#MAX_VALUE} and returns the previous value and
86+ * considers {@link Long#MIN_VALUE} as a cancel indication (no addition then).
87+ * @param requested the {@code AtomicLong} holding the current requested value
88+ * @param n the value to add, must be positive (not verified)
89+ * @return the original value before the add
90+ */
91+ public static long addCancel (@ NonNull AtomicLong requested , long n ) {
92+ for (;;) {
93+ long r = requested .get ();
94+ if (r == Long .MIN_VALUE ) {
95+ return Long .MIN_VALUE ;
96+ }
97+ if (r == Long .MAX_VALUE ) {
98+ return Long .MAX_VALUE ;
99+ }
100+ long u = addCap (r , n );
101+ if (requested .compareAndSet (r , u )) {
102+ return r ;
103+ }
104+ }
105+ }
106+
107+ /**
108+ * Atomically subtract the given number (positive, not validated) from the target field unless it contains {@link Long#MAX_VALUE}.
109+ * @param requested the target field holding the current requested amount
110+ * @param n the produced element count, positive (not validated)
111+ * @return the new amount
112+ */
113+ public static long produced (@ NonNull AtomicLong requested , long n ) {
114+ for (;;) {
115+ long current = requested .get ();
116+ if (current == Long .MAX_VALUE ) {
117+ return Long .MAX_VALUE ;
118+ }
119+ long update = current - n ;
120+ if (update < 0L ) {
121+ RxJavaPlugins .onError (new IllegalStateException ("More produced than requested: " + update ));
122+ update = 0L ;
123+ }
124+ if (requested .compareAndSet (current , update )) {
125+ return update ;
126+ }
127+ }
128+ }
129+
130+ /**
131+ * Atomically subtract the given number (positive, not validated) from the target field if
132+ * it doesn't contain {@link Long#MIN_VALUE} (indicating some cancelled state) or {@link Long#MAX_VALUE} (unbounded mode).
133+ * @param requested the target field holding the current requested amount
134+ * @param n the produced element count, positive (not validated)
135+ * @return the new amount
136+ */
137+ public static long producedCancel (@ NonNull AtomicLong requested , long n ) {
138+ for (;;) {
139+ long current = requested .get ();
140+ if (current == Long .MIN_VALUE ) {
141+ return Long .MIN_VALUE ;
142+ }
143+ if (current == Long .MAX_VALUE ) {
144+ return Long .MAX_VALUE ;
145+ }
146+ long update = current - n ;
147+ if (update < 0L ) {
148+ RxJavaPlugins .onError (new IllegalStateException ("More produced than requested: " + update ));
149+ update = 0L ;
150+ }
151+ if (requested .compareAndSet (current , update )) {
152+ return update ;
153+ }
154+ }
155+ }
156+ }
0 commit comments