diff --git a/reactor-extra/src/main/java/reactor/math/MathFlux.java b/reactor-extra/src/main/java/reactor/math/MathFlux.java index 8fa71335b..410beeb55 100644 --- a/reactor-extra/src/main/java/reactor/math/MathFlux.java +++ b/reactor-extra/src/main/java/reactor/math/MathFlux.java @@ -143,6 +143,10 @@ public static final Mono sumDouble(Publisher source, Function + * Further conversion of individual elements to {@link BigDecimal} is always applied to + * retain maximum precision. When the sum is ultimately produced, it can be rounded or + * truncated as a {@link BigDecimal#toBigInteger()} final conversion is applied. * * @param source the numerical source * @return {@link Mono} of the sum of items in source @@ -154,6 +158,10 @@ public static Mono sumBigInteger(Publisher source) /** * Computes the {@link BigInteger} sum of items in the source, which are mapped to * numerical values using provided mapping. + *

+ * Further conversion of individual elements to {@link BigDecimal} is always applied to + * retain maximum precision. When the sum is ultimately produced, it can be rounded or + * truncated as a {@link BigDecimal#toBigInteger()} final conversion is applied. * * @param source the source items * @param mapping a function to map source items to numerical values @@ -237,6 +245,11 @@ public static final Mono averageDouble(Publisher source, Function /** * Computes the {@link BigInteger} average of items in the source. + *

+ * The average is computed by summing {@link BigDecimal}-converted values and ultimately + * dividing that number by the number of elements. Eventually the result of that division + * can be rounded or truncated, as the {@link java.math.RoundingMode#FLOOR FLOOR rounding mode} + * is applied and the {@link BigDecimal#toBigInteger()} final conversion is performed. * * @param source the numerical source * @return {@link Mono} of the average of items in source @@ -248,6 +261,12 @@ public static Mono averageBigInteger(Publisher sou /** * Computes the {@link BigInteger} average of items in the source, which are mapped to * numerical values using the provided mapping. + *

+ * Further conversion of individual elements to {@link BigDecimal} is always applied. + * The average is computed by summing {@link BigDecimal}-converted values and ultimately + * dividing that number by the number of elements. Eventually the result of that division + * can be rounded or truncated, as the {@link java.math.RoundingMode#FLOOR FLOOR rounding mode} + * is applied and the {@link BigDecimal#toBigInteger()} final conversion is performed. * * @param source the source items * @param mapping a function to map source items to numerical values @@ -255,7 +274,7 @@ public static Mono averageBigInteger(Publisher sou */ public static final Mono averageBigInteger(Publisher source, Function mapping) { - return MathMono.onAssembly(new MonoAverageBigInteger(source, mapping)); + return MathMono.onAssembly(new MonoAverageBigInteger<>(source, mapping)); } diff --git a/reactor-extra/src/main/java/reactor/math/MonoAverageBigInteger.java b/reactor-extra/src/main/java/reactor/math/MonoAverageBigInteger.java index d49f2b219..1937da468 100644 --- a/reactor-extra/src/main/java/reactor/math/MonoAverageBigInteger.java +++ b/reactor-extra/src/main/java/reactor/math/MonoAverageBigInteger.java @@ -16,7 +16,9 @@ package reactor.math; +import java.math.BigDecimal; import java.math.BigInteger; +import java.math.RoundingMode; import java.util.function.Function; import org.reactivestreams.Publisher; @@ -52,7 +54,7 @@ private static final class AverageBigIntegerSubscriber private int count; - private BigInteger sum = BigInteger.ZERO; + private BigDecimal sum = BigDecimal.ZERO; AverageBigIntegerSubscriber(CoreSubscriber actual, Function mapping) { @@ -63,19 +65,28 @@ private static final class AverageBigIntegerSubscriber @Override protected void reset() { count = 0; - sum = BigInteger.ZERO; + sum = BigDecimal.ZERO; } @Override protected BigInteger result() { - return (count == 0 ? null : sum.divide(BigInteger.valueOf(count))); + return (count == 0 ? null : sum.divide(BigDecimal.valueOf(count), RoundingMode.FLOOR).toBigInteger()); } @Override protected void updateResult(T newValue) { Number number = mapping.apply(newValue); - BigInteger bigIntegerValue = BigInteger.valueOf(number.longValue()); - sum = sum.add(bigIntegerValue); + BigDecimal bigDecimalValue; + if (number instanceof BigDecimal) { + bigDecimalValue = (BigDecimal) number; + } + else if (number instanceof BigInteger) { + bigDecimalValue = new BigDecimal((BigInteger) number); + } + else { + bigDecimalValue = new BigDecimal(number.toString()); + } + sum = sum.add(bigDecimalValue); count++; } } diff --git a/reactor-extra/src/main/java/reactor/math/MonoSumBigInteger.java b/reactor-extra/src/main/java/reactor/math/MonoSumBigInteger.java index e6b06f232..36aa436cb 100644 --- a/reactor-extra/src/main/java/reactor/math/MonoSumBigInteger.java +++ b/reactor-extra/src/main/java/reactor/math/MonoSumBigInteger.java @@ -16,6 +16,7 @@ package reactor.math; +import java.math.BigDecimal; import java.math.BigInteger; import java.util.function.Function; @@ -50,7 +51,7 @@ static final private class SumBigIntegerSubscriber private final Function mapping; - BigInteger sum; + BigDecimal sum; boolean hasValue; @@ -62,20 +63,29 @@ static final private class SumBigIntegerSubscriber @Override protected void reset() { - sum = BigInteger.ZERO; + sum = BigDecimal.ZERO; hasValue = false; } @Override protected BigInteger result() { - return (hasValue ? sum : null); + return (hasValue ? sum.toBigInteger() : null); } @Override protected void updateResult(T newValue) { Number number = mapping.apply(newValue); - BigInteger bigIntegerValue = BigInteger.valueOf(number.longValue()); - sum = hasValue ? sum.add(bigIntegerValue) : bigIntegerValue; + BigDecimal bigDecimalValue; + if (number instanceof BigDecimal) { + bigDecimalValue = (BigDecimal) number; + } + else if (number instanceof BigInteger) { + bigDecimalValue = new BigDecimal((BigInteger) number); + } + else { + bigDecimalValue = new BigDecimal(number.toString()); + } + sum = hasValue ? sum.add(bigDecimalValue) : bigDecimalValue; hasValue = true; } } diff --git a/reactor-extra/src/test/java/reactor/math/ReactorMathTests.java b/reactor-extra/src/test/java/reactor/math/ReactorMathTests.java index 7a3ae6bd4..b6841cacd 100644 --- a/reactor-extra/src/test/java/reactor/math/ReactorMathTests.java +++ b/reactor-extra/src/test/java/reactor/math/ReactorMathTests.java @@ -177,6 +177,41 @@ public void emptySumBigInteger() { verifyEmptyResult(MathFlux.sumBigInteger(Mono.empty())); } + @Test + public void noOverflowSumBigInteger() { + long longValue = 1100000000000000L; + String bigValue = "12319800000000000000"; + verifyResult(MathFlux.sumBigInteger(Mono.just(BigInteger.valueOf(longValue))), BigInteger.valueOf(longValue)); + verifyResult(MathFlux.sumBigInteger(Mono.just(new BigInteger(bigValue, 10))), new BigInteger(bigValue, 10)); + verifyResult( + MathFlux.sumBigInteger(Flux.just(BigInteger.valueOf(longValue), new BigInteger(bigValue, 10))), + new BigInteger(bigValue, 10).add(BigInteger.valueOf(longValue)) + ); + } + + @Test + public void noProgressivePrecisionLossSumBigInteger() { + //smaller than 0.5d, but the fractional parts are individually taken into account + double overflowingValue1 = 0.2d; + double overflowingValue2 = 0.4d; + double overflowingValue3 = 0.4d; + + verifyResult(MathFlux.sumBigInteger(Flux.just(Long.MAX_VALUE, overflowingValue1, overflowingValue2, overflowingValue3)), + BigInteger.valueOf(Long.MAX_VALUE).add(BigInteger.ONE)); + } + + @Test + public void finalPrecisionLossSumBigInteger() { + //smaller than 0.5d, but the fractional parts are individually taken into account + //but at the end the extra 0.2 is dropped + double overflowingValue1 = 0.4d; + double overflowingValue2 = 0.4d; + double overflowingValue3 = 0.4d; + + verifyResult(MathFlux.sumBigInteger(Flux.just(Long.MAX_VALUE, overflowingValue1, overflowingValue2, overflowingValue3)), + BigInteger.valueOf(Long.MAX_VALUE).add(BigInteger.ONE)); + } + @Test public void fluxSumBigDecimal() { int count = 10; @@ -317,6 +352,33 @@ public void emptyAverageBigInteger() { verifyEmptyResult(MathFlux.averageBigInteger(Mono.empty())); } + @Test + public void noOverflowAverageBigInteger() { + long longValue = 1100000000000000L; + String bigValue = "12319800000000000000"; + verifyResult(MathFlux.averageBigInteger(Mono.just(BigInteger.valueOf(longValue))), BigInteger.valueOf(longValue)); + verifyResult(MathFlux.averageBigInteger(Mono.just(new BigInteger(bigValue, 10))), new BigInteger(bigValue, 10)); + verifyResult( + MathFlux.averageBigInteger(Flux.just(BigInteger.valueOf(longValue), new BigInteger(bigValue, 10))), + new BigInteger(bigValue, 10).add(BigInteger.valueOf(longValue)).divide(BigInteger.valueOf(2L)) + ); + } + + @Test + public void noProgressivePrecisionLossAverageBigInteger() { + Flux numbers = Flux.just(1001.4d, 1000.2d, 1001.4d); + //3003 / 3 == 1001. rounding down each number would instead result in 3002 / 3 == 1000 + + verifyResult(MathFlux.averageBigInteger(numbers), BigInteger.valueOf(1001)); + } + + @Test + public void finalPrecisionLossAverageBigInteger() { + Flux numbers = Flux.just(1001.4d, 1000.2d, 1000.4d); + + verifyResult(MathFlux.averageBigInteger(numbers), BigInteger.valueOf(1000)); + } + @Test public void fluxAverageBigDecimal() { int count = 10;