From 045b7ce0ff554b37333751a5a2b4c790ce785ff1 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 1 Jul 2026 22:16:13 +0200 Subject: [PATCH 1/2] 4.x: Streamable map, filter, flatMap impl, fixes and reworks --- .../io/reactivex/rxjava4/core/Streamable.java | 105 ++++++-- .../io/reactivex/rxjava4/core/Streamer.java | 90 +------ .../streamable/StreamableFilter.java | 97 +++++++ .../streamable/StreamableFlatMap.java | 244 ++++++++++++++++++ .../operators/streamable/StreamableMap.java | 72 ++++++ .../streamable/StreamableMapOptional.java | 98 +++++++ .../rxjava4/observers/BaseTestConsumer.java | 17 ++ .../ObservableConcatMapSchedulerTest.java | 2 +- .../streamable/StreamableErrorTest.java | 1 - .../streamable/StreamableFilterTest.java | 101 ++++++++ .../streamable/StreamableFlatMapTest.java | 197 ++++++++++++++ .../streamable/StreamableMapOptionalTest.java | 102 ++++++++ .../streamable/StreamableMapTest.java | 54 ++++ .../subscribers/TestSubscriberTest.java | 24 ++ 14 files changed, 1098 insertions(+), 106 deletions(-) create mode 100644 src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFilter.java create mode 100644 src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFlatMap.java create mode 100644 src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableMap.java create mode 100644 src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableMapOptional.java create mode 100644 src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFilterTest.java create mode 100644 src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFlatMapTest.java create mode 100644 src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableMapOptionalTest.java create mode 100644 src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableMapTest.java diff --git a/src/main/java/io/reactivex/rxjava4/core/Streamable.java b/src/main/java/io/reactivex/rxjava4/core/Streamable.java index 66df4940bb..a1ff2ff2f6 100644 --- a/src/main/java/io/reactivex/rxjava4/core/Streamable.java +++ b/src/main/java/io/reactivex/rxjava4/core/Streamable.java @@ -19,6 +19,7 @@ import java.util.stream.Stream; import io.reactivex.rxjava4.annotations.*; +import io.reactivex.rxjava4.core.config.StandardConcurrentConfig; import io.reactivex.rxjava4.disposables.*; import io.reactivex.rxjava4.exceptions.Exceptions; import io.reactivex.rxjava4.functions.*; @@ -84,6 +85,20 @@ public interface Streamable<@NonNull T> { return RxJavaPlugins.onAssembly(new StreamableJust<>(item)); } + /** + * Filters out the upstream items that do not pass the given predicate + * @param predicate the callback that should return {@code true} to let the upstream value pass + * or {@code false} to ignore it and continue with the next upstream item + * @return the new {@code Streamable} instance + * @throw NullPointerException if {@code predicate} is {@code null} + */ + @CheckReturnValue + @NonNull + default Streamable filter(@NonNull Predicate predicate) { + Objects.requireNonNull(predicate, "predicate is null"); + return RxJavaPlugins.onAssembly(new StreamableFilter<>(this, predicate)); + } + /** * Streams all elements of the given items array. * @param the element type of the items @@ -427,6 +442,32 @@ default Streamable hide() { return RxJavaPlugins.onAssembly(new StreamableHide<>(this)); } + /** + * Maps each upstream item into another item via a mapper function. + * @param the element type of the mapping + * @param mapper the function that takes an upstream item and returns an item to be emitted + * to the downstream + * @return the new {@code Streamable} instance + * @throw NullPointerException if {@code mapper} is {@code null} + */ + default <@NonNull R> Streamable map(@NonNull Function mapper) { + Objects.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new StreamableMap<>(this, mapper)); + } + + /** + * Maps each upstream item into another, optional item via a mapper function that skips the empty optionals. + * @param the element type of the mapping + * @param mapper the function that takes an upstream item and returns an optional item to be emitted / skipped + * to the downstream + * @return the new {@code Streamable} instance + * @throw NullPointerException if {@code mapper} is {@code null} + */ + default <@NonNull R> Streamable mapOptional(@NonNull Function> mapper) { + Objects.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new StreamableMapOptional<>(this, mapper)); + } + /** * Transforms the upstream sequence into zero or more elements for the downstream. * @param the result element type @@ -469,6 +510,24 @@ default Streamable take(long n) { }); } + /** + * Maps each upstream item onto a {@code Streamable} and runs them concurrently while + * relaying inner items as first-come-first-served manner. + * @param the element type of the output sequence + * @param mapper the function that turns an upstream item into a {@code Streamable} inner sequence + * @param config the configuration record for this operator + * @return the new {@code Streamable} instance + * @throws NullPointerException if {@code mapper} or {@code config} is {@code null} + */ + @CheckReturnValue + @NonNull + default Streamable flatMap(@NonNull Function> mapper, + @NonNull StandardConcurrentConfig config) { + Objects.requireNonNull(mapper, "mapper is null"); + Objects.requireNonNull(config, "config is null"); + return RxJavaPlugins.onAssembly(new StreamableFlatMap<>(this, mapper, config.maxConcurrency())); + } + // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo // Consumption methods and outgoing converters // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo @@ -525,17 +584,21 @@ default CompletionStageDisposable forEach(@NonNull Consumer con Objects.requireNonNull(executor, "executor is null"); final Streamable me = this; var future = CompletableFuture.supplyAsync(() -> { - try (var str = me.stream(canceller)) { - while (!canceller.isDisposed()) { - if (str.awaitNext(canceller)) { - // System.out.println("Received " + str.current()); - consumer.accept(Objects.requireNonNull(str.current(), "The upstream Streamable " + me.getClass() + " produced a null element!")); - } else { - // System.out.println("EOF "); - break; + var str = me.stream(canceller); + try { + try { + while (!canceller.isDisposed()) { + if (str.awaitNext(canceller)) { + // System.out.println("Received " + str.current()); + consumer.accept(Objects.requireNonNull(str.current(), "The upstream Streamable " + me.getClass() + " produced a null element!")); + } else { + // System.out.println("EOF "); + break; + } } + } finally { + str.awaitFinish(canceller); } - // System.out.println("Canceller status after loop: " + canceller.isDisposed()); } catch (final Throwable crash) { Exceptions.throwIfFatal(crash); if (crash instanceof CompletionException ce) { @@ -567,19 +630,23 @@ default CompletionStageDisposable forEach( Objects.requireNonNull(executor, "executor is null"); final Streamable me = this; var future = CompletableFuture.supplyAsync(() -> { - try (var str = me.stream(canceller)) { + var str = me.stream(canceller); + try { + try { var stopper = Disposable.empty(); - while (!canceller.isDisposed() && !stopper.isDisposed()) { - if (str.awaitNext(canceller)) { - // System.out.println("Received " + str.current()); - var v = Objects.requireNonNull(str.current(), "The upstream Streamable " + me.getClass() + " produced a null element!"); - consumer.accept(v, stopper); - } else { - // System.out.println("EOF "); - break; + while (!canceller.isDisposed() && !stopper.isDisposed()) { + if (str.awaitNext(canceller)) { + // System.out.println("Received " + str.current()); + var v = Objects.requireNonNull(str.current(), "The upstream Streamable " + me.getClass() + " produced a null element!"); + consumer.accept(v, stopper); + } else { + // System.out.println("EOF "); + break; + } } + } finally { + str.awaitFinish(canceller); } - // System.out.println("Canceller status after loop: " + canceller.isDisposed()); } catch (final Throwable crash) { Exceptions.throwIfFatal(crash); if (crash instanceof CompletionException ce) { diff --git a/src/main/java/io/reactivex/rxjava4/core/Streamer.java b/src/main/java/io/reactivex/rxjava4/core/Streamer.java index 87562ec2f9..42eed8291b 100644 --- a/src/main/java/io/reactivex/rxjava4/core/Streamer.java +++ b/src/main/java/io/reactivex/rxjava4/core/Streamer.java @@ -31,7 +31,7 @@ * TODO proper docs * @since 4.0.0 */ -public interface Streamer<@NonNull T> extends AutoCloseable, AwaitCoordinator { +public interface Streamer<@NonNull T> extends AwaitCoordinator { // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo // API @@ -46,10 +46,11 @@ public interface Streamer<@NonNull T> extends AutoCloseable, AwaitCoordinator { CompletionStage next(@NonNull DisposableContainer cancellation); /** - * Returns the current element if {@link #next()} yielded {@code true}. - * Can be called multiple times between {@link #next()} calls. + * Returns the current element if {@link #next(DisposableContainer)} yielded {@code true}. + * Can be called multiple times between {@link #next(DisposableContainer)} calls. * @return the current element - * @throws NoSuchElementException before the very first {@link #next()} or after {@link #next()} returned {@code false} + * @throws NoSuchElementException before the very first {@link #next(DisposableContainer)} + * or after {@link #next(DisposableContainer)} returned {@code false} */ @NonNull T current(); @@ -67,80 +68,6 @@ public interface Streamer<@NonNull T> extends AutoCloseable, AwaitCoordinator { // HELPERS // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo - /** - * Determine if there are more elements available from the source. - * Uses a default, individual {@link CompositeDisposable} to manage cancellation. - * @return eventually true or false, indicating availability or termination - */ - @NonNull - default CompletionStage next() { - return next(new CompositeDisposable()); - } - - /** - * Make this Streamer a resource and a Closeable, allowing virtually blocking closing. - */ - @Override - default void close() { - awaitFinish(); - } - - /** - * Augments the streamer so that the given canceller is injected into the various - * lifecycle await calls. - * @param canceller the canceller to inject - * @return the augmented streamer - */ - default Streamer finishVia(@NonNull DisposableContainer canceller) { - Objects.requireNonNull(canceller, "canceller is null"); - if (this instanceof StreamerFinishViaDisposableContainerCanceller( - Streamer streamer, DisposableContainer canceller1 - )) { - if (streamer == this && canceller1 == canceller) { - // DO not rewrap! - return this; - } - } - - return new StreamerFinishViaDisposableContainerCanceller<>(this, canceller); - } - - /** - * Augments the base streamer with a canceller so that it can be injected at the various await calls. - * @param the element type of the stream - */ - static record StreamerFinishViaDisposableContainerCanceller( - @NonNull Streamer streamer, @NonNull DisposableContainer canceller) - implements Streamer { - - @Override - public @NonNull CompletionStage next(@NonNull DisposableContainer cancellation) { - // TODO Auto-generated method stub - return streamer.next(cancellation); - } - - @Override - public @NonNull T current() { - return streamer.current(); - } - - @Override - public @NonNull CompletionStage finish(@NonNull DisposableContainer cancellation) { - return streamer.finish(cancellation); - } - - } - - /** - * Moves and awaits the sequence's next element, returns false if there are no more - * data. - * @return true if the next element via {@link #current()} can be read, or false if - * the stream ended. - */ - default boolean awaitNext() { - return await(next()); - } - /** * Moves and awaits the sequence's next element, returns false if there are no more * data. @@ -152,13 +79,6 @@ default boolean awaitNext(@NonNull DisposableContainer cancellation) { return await(next(cancellation), cancellation); } - /** - * Finish and cleanup the sequence after its completion or cancellation. - */ - default void awaitFinish() { - await(finish(DisposableContainer.NEVER), DisposableContainer.NEVER); - } - /** * Who cancels the cancellation attempt? Another cancellation attempt! * @param cancellation the token to cancel and ongoing cancel attempt diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFilter.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFilter.java new file mode 100644 index 0000000000..8dac61b9cd --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFilter.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import io.reactivex.rxjava4.annotations.NonNull; +import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.disposables.DisposableContainer; +import io.reactivex.rxjava4.exceptions.Exceptions; +import io.reactivex.rxjava4.functions.Predicate; +import io.reactivex.rxjava4.internal.fuseable.HasUpstreamStreamableSource; + +public record StreamableFilter( + @NonNull Streamable source, + @NonNull Predicate predicate) +implements Streamable, HasUpstreamStreamableSource { + + @Override + public @NonNull Streamer<@NonNull T> stream(@NonNull DisposableContainer cancellation) { + return new FilterStreamer<>(source.stream(cancellation), predicate); + } + + static final class FilterStreamer implements Streamer { + final Streamer upstream; + final Predicate predicate; + volatile T current; + + final AtomicInteger wip = new AtomicInteger(); + + FilterStreamer(Streamer upstream, Predicate predicate) { + this.upstream = upstream; + this.predicate = predicate; + } + + @Override + public @NonNull CompletionStage next(@NonNull DisposableContainer cancellation) { + var cf = new CompletableFuture(); + drain(cf, cancellation); + return cf; + } + + @Override + public @NonNull T current() { + return current; + } + + @Override + public @NonNull CompletionStage finish(@NonNull DisposableContainer cancellation) { + current = null; + return upstream.finish(cancellation); + } + + void drain(CompletableFuture cf, DisposableContainer cancellation) { + if (wip.getAndIncrement() != 0) { + return; + } + do { + upstream.next(cancellation) + .whenComplete((v, e) -> { + if (e != null) { + cf.completeExceptionally(e); + } else { + if (v) { + try { + var w = upstream.current(); + if (predicate.test(w)) { + current = w; + cf.complete(true); + } else { + drain(cf, cancellation); + } + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + cf.completeExceptionally(ex); + } + } else { + cf.complete(false); + } + } + }); + } while (wip.decrementAndGet() != 0); + } + } +} diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFlatMap.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFlatMap.java new file mode 100644 index 0000000000..5eb5826ed4 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFlatMap.java @@ -0,0 +1,244 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import java.io.Serial; +import java.util.Objects; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.function.BiConsumer; + +import io.reactivex.rxjava4.annotations.NonNull; +import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.disposables.DisposableContainer; +import io.reactivex.rxjava4.exceptions.Exceptions; +import io.reactivex.rxjava4.functions.Function; +import io.reactivex.rxjava4.internal.fuseable.HasUpstreamStreamableSource; +import io.reactivex.rxjava4.internal.queue.MpscLinkedQueue; +import io.reactivex.rxjava4.internal.util.AtomicThrowable; + +public record StreamableFlatMap( + Streamable source, + Function> mapper, + int maxConcurrency +) +implements Streamable, HasUpstreamStreamableSource { + + @Override + public @NonNull Streamer<@NonNull R> stream(@NonNull DisposableContainer cancellation) { + var result = new FlatMapMainStreamer<>(source.stream(cancellation), mapper, maxConcurrency, cancellation); + result.loopMain(); + return result; + } + + static final class FlatMapMainStreamer implements Streamer, BiConsumer { + + final Streamer upstream; + + final Function> mapper; + + final int maxConcurrency; + + final AtomicInteger wip = new AtomicInteger(); + + final DisposableContainer mainCanceller; + + final MpscLinkedQueue> queue; + + final AtomicInteger queueWip; + + final AtomicThrowable errors; + + final AtomicReference> ready; + + volatile R current; + + final AtomicInteger active; + + volatile boolean mainDone; + + FlatMapMainStreamer(Streamer upstream, + Function> mapper, + int maxConcurrency, + DisposableContainer container) { + this.upstream = upstream; + this.mapper = mapper; + this.maxConcurrency = maxConcurrency; + this.mainCanceller = container; + this.queue = new MpscLinkedQueue<>(); + this.queueWip = new AtomicInteger(); + this.errors = new AtomicThrowable(); + this.ready = new AtomicReference<>(); + this.active = new AtomicInteger(); + } + + @Override + public @NonNull CompletionStage next(DisposableContainer cancellation) { + var cf = new CompletableFuture(); + ready.lazySet(cf); + drain(); + return cf; + } + + @Override + public @NonNull R current() { + return current; + } + + @Override + public @NonNull CompletionStage finish(DisposableContainer canceller) { + current = null; + return upstream.finish(canceller); + } + + void drain() { + if (queueWip.getAndIncrement() != 0) { + return; + } + + do { + var cf = ready.get(); + if (cf != null) { + var md = mainDone; + var n = active.get(); + var inner = queue.poll(); + if (md && n == 0 && inner == null) { + ready.lazySet(null); + current = null; + var err = errors.get(); + if (err != null) { + cf.completeExceptionally(err); + } else { + cf.complete(false); + } + } else + if (inner != null) { + ready.lazySet(null); + current = inner.inner.current(); + inner.drain(); + cf.complete(true); + } + } + } while (queueWip.decrementAndGet() != 0); + } + + void loopMain() { + if (wip.getAndIncrement() != 0) { + return; + } + + do { + if (!mainDone) { + upstream.next(mainCanceller).whenComplete(this); + } else { + drain(); + } + } while (wip.decrementAndGet() != 0); + } + + @Override + public void accept(Boolean v, Throwable e) { + if (e != null) { + errors.tryAddThrowableOrReport(e); + mainDone = true; + drain(); + } else + if (v) { + var n = active.incrementAndGet(); + + try { + var innerCanceller = mainCanceller.derive(); + var inner = Objects.requireNonNull(mapper.apply(upstream.current()), "The mapper returned a null value.") + .stream(innerCanceller); + + new InnerStreamer(inner, this, innerCanceller).drain(); + + if (n < maxConcurrency) { + loopMain(); + } + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + errors.tryAddThrowableOrReport(ex); + active.decrementAndGet(); + mainDone = true; + drain(); + } + } else { + mainDone = true; + drain(); + } + } + } + + static final class InnerStreamer extends AtomicInteger implements BiConsumer { + @Serial + private static final long serialVersionUID = 1840568797902248120L; + + final Streamer inner; + + final FlatMapMainStreamer parent; + + final DisposableContainer canceller; + + volatile boolean done; + + boolean finishing; + + InnerStreamer(Streamer inner, FlatMapMainStreamer parent, DisposableContainer canceller) { + this.inner = inner; + this.parent = parent; + this.canceller = canceller; + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + do { + if (!done) { + inner.next(canceller).whenComplete(this); + } else { + finishing = true; + inner.finish(canceller).whenComplete(this); + break; + } + } while (decrementAndGet() != 0); + } + + @Override + public void accept(Object t, Throwable u) { + if (finishing) { + parent.active.decrementAndGet(); + if (u != null) { + parent.errors.tryAddThrowableOrReport(u); + } + parent.loopMain(); + } else { + if (u != null) { + parent.errors.tryAddThrowableOrReport(u); + done = true; + drain(); + } else { + if ((Boolean)t) { + parent.queue.offer(this); + parent.drain(); + } else { + done = true; + drain(); + } + } + } + } + } +} diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableMap.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableMap.java new file mode 100644 index 0000000000..af8fa4019d --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableMap.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import java.util.Objects; +import java.util.concurrent.CompletionStage; + +import io.reactivex.rxjava4.annotations.NonNull; +import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.disposables.DisposableContainer; +import io.reactivex.rxjava4.functions.Function; +import io.reactivex.rxjava4.internal.fuseable.HasUpstreamStreamableSource; +import io.reactivex.rxjava4.internal.util.ExceptionHelper; + +public record StreamableMap( + @NonNull Streamable source, + @NonNull Function mapper) +implements Streamable, HasUpstreamStreamableSource { + + @Override + public @NonNull Streamer<@NonNull R> stream(@NonNull DisposableContainer cancellation) { + return new MapStreamer<>(source.stream(cancellation), mapper); + } + + static final class MapStreamer implements Streamer { + final Streamer upstream; + final Function mapper; + volatile R current; + MapStreamer(Streamer upstream, Function mapper) { + this.upstream = upstream; + this.mapper = mapper; + } + + @Override + public @NonNull CompletionStage next(@NonNull DisposableContainer cancellation) { + return upstream.next(cancellation) + .thenApply(e -> { + if (e) { + try { + current = Objects.requireNonNull(mapper.apply(upstream.current()), "The mapper returned a null value"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + return true; + } + return false; + }); + } + + @Override + public @NonNull R current() { + return current; + } + + @Override + public @NonNull CompletionStage finish(@NonNull DisposableContainer cancellation) { + current = null; + return upstream.finish(cancellation); + } + } +} diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableMapOptional.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableMapOptional.java new file mode 100644 index 0000000000..7210356263 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableMapOptional.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import io.reactivex.rxjava4.annotations.NonNull; +import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.disposables.DisposableContainer; +import io.reactivex.rxjava4.exceptions.Exceptions; +import io.reactivex.rxjava4.functions.Function; +import io.reactivex.rxjava4.internal.fuseable.HasUpstreamStreamableSource; + +public record StreamableMapOptional( + @NonNull Streamable source, + @NonNull Function> mapper) +implements Streamable, HasUpstreamStreamableSource { + + @Override + public @NonNull Streamer<@NonNull R> stream(@NonNull DisposableContainer cancellation) { + return new MapStreamer<>(source.stream(cancellation), mapper); + } + + static final class MapStreamer implements Streamer { + final Streamer upstream; + final Function> mapper; + volatile R current; + + final AtomicInteger wip = new AtomicInteger(); + + MapStreamer(Streamer upstream, Function> mapper) { + this.upstream = upstream; + this.mapper = mapper; + } + + @Override + public @NonNull CompletionStage next(@NonNull DisposableContainer cancellation) { + var cf = new CompletableFuture(); + drain(cf, cancellation); + return cf; + } + + @Override + public @NonNull R current() { + return current; + } + + @Override + public @NonNull CompletionStage finish(@NonNull DisposableContainer cancellation) { + current = null; + return upstream.finish(cancellation); + } + + void drain(CompletableFuture cf, DisposableContainer cancellation) { + if (wip.getAndIncrement() != 0) { + return; + } + do { + upstream.next(cancellation) + .whenComplete((v, e) -> { + if (e != null) { + cf.completeExceptionally(e); + } else { + if (v) { + try { + var w = Objects.requireNonNull(mapper.apply(upstream.current()), "The mapper returned a null value"); + if (w.isPresent()) { + current = w.get(); + cf.complete(true); + } else { + drain(cf, cancellation); + } + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + cf.completeExceptionally(ex); + } + } else { + cf.complete(false); + } + } + }); + } while (wip.decrementAndGet() != 0); + } + } +} diff --git a/src/main/java/io/reactivex/rxjava4/observers/BaseTestConsumer.java b/src/main/java/io/reactivex/rxjava4/observers/BaseTestConsumer.java index ebbcdc7939..57a3c41bd7 100644 --- a/src/main/java/io/reactivex/rxjava4/observers/BaseTestConsumer.java +++ b/src/main/java/io/reactivex/rxjava4/observers/BaseTestConsumer.java @@ -543,6 +543,23 @@ public final U assertResult(@NonNull T... values) { .assertComplete(); } + /** + * Assert that the upstream signaled any of the given values in any order. + * @param values the values to check in the received values list + * @return this + */ + @SuppressWarnings("unchecked") + @SafeVarargs + public final U assertValueSet(T... values) { + Set expectedSet = new HashSet<>(Arrays.asList(values)); + for (T t : this.values) { + if (!expectedSet.contains(t)) { + throw fail("Item not in the set: " + BaseTestConsumer.valueAndClass(t)); + } + } + return (U)this; + } + /** * Assert that the upstream signaled the specified values in order * and then failed with a specific class or subclass of {@link Throwable}. diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableConcatMapSchedulerTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableConcatMapSchedulerTest.java index a95dd015a8..af66c32e89 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableConcatMapSchedulerTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableConcatMapSchedulerTest.java @@ -39,7 +39,7 @@ import io.reactivex.rxjava4.subjects.*; import io.reactivex.rxjava4.testsupport.*; -public class ObservableConcatMapSchedulerTest { +public class ObservableConcatMapSchedulerTest extends RxJavaTest { @Test public void boundaryFusion() { diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableErrorTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableErrorTest.java index 75f3b5166e..af30bdfbdb 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableErrorTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableErrorTest.java @@ -45,7 +45,6 @@ public void normal() throws Throwable { .assertFailure(TestException.class); } - @SuppressWarnings("resource") @Test public void currentThrows() { assertThrows(IllegalStateException.class, () -> { diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFilterTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFilterTest.java new file mode 100644 index 0000000000..b5c943bec0 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFilterTest.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Isolated; + +import io.reactivex.rxjava4.core.Streamable; +import io.reactivex.rxjava4.exceptions.TestException; + +@Isolated +public class StreamableFilterTest extends StreamableBaseTest { + + @Test + public void basic() { + Streamable.range(1, 5) + .filter(_ -> true) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void basicDebug() throws Throwable { + withCachedExecutor(exec -> { + Streamable.range(1, 5) + .filter(_ -> true) + .test(exec) + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5); + }); + } + + @Test + public void allEmpty() { + Streamable.range(1, 5) + .filter(_ -> false) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); + } + + @Test + public void someEmpty() { + Streamable.range(1, 5) + .filter(v -> v % 2 == 0) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(2, 4); + } + + @Test + public void middleEmpty() { + Streamable.range(1, 5) + .filter(v -> v == 1 || v == 5) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 5); + } + + @Test + public void mapperCrash() { + Streamable.range(1, 5) + .filter(_ -> { throw new TestException(); }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void sourceError() { + Streamable.error(new TestException()) + .filter(_ -> true) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void sourceError2() { + Streamable.error(new TestException()) + .filter(_ -> false) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + +} diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFlatMapTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFlatMapTest.java new file mode 100644 index 0000000000..94c9f0edf5 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFlatMapTest.java @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import java.util.concurrent.*; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Isolated; + +import io.reactivex.rxjava4.annotations.NonNull; +import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.core.config.StandardConcurrentConfig; +import io.reactivex.rxjava4.disposables.DisposableContainer; +import io.reactivex.rxjava4.exceptions.TestException; +import io.reactivex.rxjava4.schedulers.Schedulers; + +@Isolated +public class StreamableFlatMapTest extends StreamableBaseTest { + + @Test + public void basic() { + Streamable.range(1, 5) + .flatMap(v -> Streamable.range(10 * v, 3), StandardConcurrentConfig.DEFAULT) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueSet(10, 11, 12, 20, 21, 22, 30, 31, 32, 40, 41, 42, 50, 51, 52) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void basicDebug() throws Throwable { + withCachedExecutor(exec -> { + Streamable.range(1, 5) + .flatMap(v -> Streamable.range(10 * v, 3), StandardConcurrentConfig.DEFAULT) + .test(exec) + .awaitDone(5, TimeUnit.SECONDS) + .assertValueSet(10, 11, 12, 20, 21, 22, 30, 31, 32, 40, 41, 42, 50, 51, 52) + .assertNoErrors() + .assertComplete(); + }); + } + + @Test + public void basicDebugOne() throws Throwable { + withCachedExecutor(exec -> { + Streamable.just(1) + .flatMap(v -> Streamable.just(10 * v), StandardConcurrentConfig.DEFAULT) + .test(exec) + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(10); + }); + } + + @Test + public void basicDebugOneInnerMany() throws Throwable { + withCachedExecutor(exec -> { + Streamable.just(1) + .flatMap(v -> Streamable.range(10 * v, 3), StandardConcurrentConfig.DEFAULT) + .test(exec) + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(10, 11, 12); + }); + } + + @Test + public void concurrentMany() { + Streamable.range(1, 5) + .flatMap(v -> Flowable.range(10 * v, 3).subscribeOn(Schedulers.computation()).toStreamable(), StandardConcurrentConfig.DEFAULT) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueSet(10, 11, 12, 20, 21, 22, 30, 31, 32, 40, 41, 42, 50, 51, 52) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void concurrentOneOne() { + Streamable.just(1) + .flatMap(v -> Flowable.just(v * 10).subscribeOn(Schedulers.computation()).toStreamable(), StandardConcurrentConfig.DEFAULT) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(10); + } + + @Test + public void concurrentOneOneDebug() throws Throwable { + withCachedExecutor(exec -> { + Streamable.just(1) + .flatMap(v -> Flowable.just(v * 10).subscribeOn(Schedulers.computation()).toStreamable(exec), StandardConcurrentConfig.DEFAULT) + .test(exec) + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(10); + }); + } + + @Test + public void concurrentOneToError() { + Streamable.just(1) + .flatMap(_ -> Streamable.error(new TestException()), StandardConcurrentConfig.DEFAULT) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void concurrentMainError() { + Streamable.error(new TestException()) + .flatMap(_ -> Streamable.just(1), StandardConcurrentConfig.DEFAULT) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void basicMaxConcurrent() { + Streamable.range(1, 5) + .flatMap(v -> Streamable.range(10 * v, 3), new StandardConcurrentConfig(1)) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(10, 11, 12, 20, 21, 22, 30, 31, 32, 40, 41, 42, 50, 51, 52) + ; + } + + @Test + public void mapperNull() { + Streamable.range(1, 5) + .flatMap(_ -> null, new StandardConcurrentConfig(1)) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(NullPointerException.class) + ; + } + + @Test + public void mapperCrash() { + Streamable.range(1, 5) + .flatMap(_ -> { throw new TestException(); }, new StandardConcurrentConfig(1)) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class) + ; + } + + @Test + public void mapperNullDebug() throws Throwable { + withCachedExecutor(exec -> { + Streamable.range(1, 5) + .flatMap(_ -> null, new StandardConcurrentConfig(1)) + .test(exec) + .awaitDone(5, TimeUnit.MINUTES) + .assertFailure(NullPointerException.class) + ; + }); + } + + @Test + public void mapperFailingFinisher() throws Throwable { + withCachedExecutor(exec -> { + Streamable.just(1) + .flatMap(_ -> (Streamable)c -> new FailingFinishStreamer(c), new StandardConcurrentConfig(1)) + .test(exec) + .awaitDone(5, TimeUnit.MINUTES) + .assertFailure(TestException.class) + ; + }); + } + + public record FailingFinishStreamer(DisposableContainer canceller) implements Streamer { + + @Override + public @NonNull CompletionStage next(@NonNull DisposableContainer cancellation) { + return NEXT_FALSE; + } + + @Override + public @NonNull Object current() { + return null; + } + + @Override + public @NonNull CompletionStage finish(@NonNull DisposableContainer cancellation) { + return CompletableFuture.failedFuture(new TestException()); + } + } +} diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableMapOptionalTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableMapOptionalTest.java new file mode 100644 index 0000000000..d5836eb9a6 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableMapOptionalTest.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Isolated; + +import io.reactivex.rxjava4.core.Streamable; +import io.reactivex.rxjava4.exceptions.TestException; + +@Isolated +public class StreamableMapOptionalTest extends StreamableBaseTest { + + @Test + public void basic() { + Streamable.range(1, 5) + .mapOptional(v -> Optional.of(v.toString())) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult("1", "2", "3", "4", "5"); + } + + @Test + public void basicDebug() throws Throwable { + withCachedExecutor(exec -> { + Streamable.range(1, 5) + .mapOptional(v -> Optional.of(v.toString())) + .test(exec) + .awaitDone(5, TimeUnit.SECONDS) + .assertResult("1", "2", "3", "4", "5"); + }); + } + + @Test + public void allEmpty() { + Streamable.range(1, 5) + .mapOptional(_ -> Optional.empty()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); + } + + @Test + public void someEmpty() { + Streamable.range(1, 5) + .mapOptional(v -> v % 2 == 0 ? Optional.of(v.toString()) : Optional.empty()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult("2", "4"); + } + + @Test + public void middleEmpty() { + Streamable.range(1, 5) + .mapOptional(v -> v == 1 || v == 5 ? Optional.of(v.toString()) : Optional.empty()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult("1", "5"); + } + + @Test + public void mapperNull() { + Streamable.range(1, 5) + .mapOptional(_ -> null) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(NullPointerException.class); + } + + @Test + public void mapperCrash() { + Streamable.range(1, 5) + .mapOptional(_ -> { throw new TestException(); }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + + @Test + public void sourceError() { + Streamable.error(new TestException()) + .mapOptional(v -> Optional.of(v)) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + +} diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableMapTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableMapTest.java new file mode 100644 index 0000000000..8cd7e8c33b --- /dev/null +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableMapTest.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Isolated; + +import io.reactivex.rxjava4.core.Streamable; +import io.reactivex.rxjava4.exceptions.TestException; + +@Isolated +public class StreamableMapTest extends StreamableBaseTest { + + @Test + public void basic() { + Streamable.range(1, 5) + .map(v -> v.toString()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult("1", "2", "3", "4", "5"); + } + + @Test + public void mapperNull() { + Streamable.range(1, 5) + .map(_ -> null) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(NullPointerException.class); + } + + @Test + public void mapperCrash() { + Streamable.range(1, 5) + .map(_ -> { throw new TestException(); }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } + +} diff --git a/src/test/java/io/reactivex/rxjava4/subscribers/TestSubscriberTest.java b/src/test/java/io/reactivex/rxjava4/subscribers/TestSubscriberTest.java index c89e72c285..ebf0c568bb 100644 --- a/src/test/java/io/reactivex/rxjava4/subscribers/TestSubscriberTest.java +++ b/src/test/java/io/reactivex/rxjava4/subscribers/TestSubscriberTest.java @@ -1683,4 +1683,28 @@ public void awaitCountInterrupted() { } }); } + + @Test + public void assertValueSet() { + var ts = new TestSubscriber<>(); + ts.onSubscribe(new BooleanSubscription()); + ts.onNext(2); + ts.onNext(1); + ts.onNext(3); + + ts.assertValueSet(1, 2, 3); + } + + @Test + public void assertValueSetFail() { + assertThrows(AssertionError.class, () -> { + var ts = new TestSubscriber<>(); + ts.onSubscribe(new BooleanSubscription()); + ts.onNext(2); + ts.onNext(1); + ts.onNext(3); + + ts.assertValueSet(1, 2); + }); + } } From 0acd24afe31ea02157ae7e417304a4b866c2cbef Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 1 Jul 2026 22:21:14 +0200 Subject: [PATCH 2/2] add some annotations --- src/main/java/io/reactivex/rxjava4/core/Streamable.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/io/reactivex/rxjava4/core/Streamable.java b/src/main/java/io/reactivex/rxjava4/core/Streamable.java index a1ff2ff2f6..562b0cb7c3 100644 --- a/src/main/java/io/reactivex/rxjava4/core/Streamable.java +++ b/src/main/java/io/reactivex/rxjava4/core/Streamable.java @@ -450,6 +450,8 @@ default Streamable hide() { * @return the new {@code Streamable} instance * @throw NullPointerException if {@code mapper} is {@code null} */ + @CheckReturnValue + @NonNull default <@NonNull R> Streamable map(@NonNull Function mapper) { Objects.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new StreamableMap<>(this, mapper)); @@ -463,6 +465,8 @@ default Streamable hide() { * @return the new {@code Streamable} instance * @throw NullPointerException if {@code mapper} is {@code null} */ + @CheckReturnValue + @NonNull default <@NonNull R> Streamable mapOptional(@NonNull Function> mapper) { Objects.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new StreamableMapOptional<>(this, mapper));