Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 90 additions & 19 deletions src/main/java/io/reactivex/rxjava4/core/Streamable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.stream.Stream;

import io.reactivex.rxjava4.annotations.*;
import io.reactivex.rxjava4.core.config.StandardConcurrentConfig;
import io.reactivex.rxjava4.disposables.*;
import io.reactivex.rxjava4.exceptions.Exceptions;
import io.reactivex.rxjava4.functions.*;
Expand Down Expand Up @@ -84,6 +85,20 @@
return RxJavaPlugins.onAssembly(new StreamableJust<>(item));
}

/**
* Filters out the upstream items that do not pass the given predicate
* @param predicate the callback that should return {@code true} to let the upstream value pass
* or {@code false} to ignore it and continue with the next upstream item
* @return the new {@code Streamable} instance
* @throw NullPointerException if {@code predicate} is {@code null}

Check warning on line 93 in src/main/java/io/reactivex/rxjava4/core/Streamable.java

View workflow job for this annotation

GitHub Actions / build

unknown tag. Mistyped @throws or an unregistered custom tag?

Check warning on line 93 in src/main/java/io/reactivex/rxjava4/core/Streamable.java

View workflow job for this annotation

GitHub Actions / build (27)

unknown tag. Mistyped @throws or an unregistered custom tag?
*/
@CheckReturnValue
@NonNull
default Streamable<T> filter(@NonNull Predicate<? super T> predicate) {
Objects.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new StreamableFilter<>(this, predicate));
}

/**
* Streams all elements of the given items array.
* @param <T> the element type of the items
Expand Down Expand Up @@ -427,6 +442,36 @@
return RxJavaPlugins.onAssembly(new StreamableHide<>(this));
}

/**
* Maps each upstream item into another item via a mapper function.
* @param <R> the element type of the mapping
* @param mapper the function that takes an upstream item and returns an item to be emitted
* to the downstream
* @return the new {@code Streamable} instance
* @throw NullPointerException if {@code mapper} is {@code null}

Check warning on line 451 in src/main/java/io/reactivex/rxjava4/core/Streamable.java

View workflow job for this annotation

GitHub Actions / build

unknown tag. Mistyped @throws or an unregistered custom tag?

Check warning on line 451 in src/main/java/io/reactivex/rxjava4/core/Streamable.java

View workflow job for this annotation

GitHub Actions / build (27)

unknown tag. Mistyped @throws or an unregistered custom tag?
*/
@CheckReturnValue
@NonNull
default <@NonNull R> Streamable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new StreamableMap<>(this, mapper));
}

/**
* Maps each upstream item into another, optional item via a mapper function that skips the empty optionals.
* @param <R> the element type of the mapping
* @param mapper the function that takes an upstream item and returns an optional item to be emitted / skipped
* to the downstream
* @return the new {@code Streamable} instance
* @throw NullPointerException if {@code mapper} is {@code null}

Check warning on line 466 in src/main/java/io/reactivex/rxjava4/core/Streamable.java

View workflow job for this annotation

GitHub Actions / build

unknown tag. Mistyped @throws or an unregistered custom tag?

Check warning on line 466 in src/main/java/io/reactivex/rxjava4/core/Streamable.java

View workflow job for this annotation

GitHub Actions / build (27)

unknown tag. Mistyped @throws or an unregistered custom tag?
*/
@CheckReturnValue
@NonNull
default <@NonNull R> Streamable<R> mapOptional(@NonNull Function<? super T, ? extends Optional<? extends R>> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new StreamableMapOptional<>(this, mapper));
}

/**
* Transforms the upstream sequence into zero or more elements for the downstream.
* @param <R> the result element type
Expand Down Expand Up @@ -469,6 +514,24 @@
});
}

/**
* Maps each upstream item onto a {@code Streamable} and runs them concurrently while
* relaying inner items as first-come-first-served manner.
* @param <R> the element type of the output sequence
* @param mapper the function that turns an upstream item into a {@code Streamable} inner sequence
* @param config the configuration record for this operator
* @return the new {@code Streamable} instance
* @throws NullPointerException if {@code mapper} or {@code config} is {@code null}
*/
@CheckReturnValue
@NonNull
default <R> Streamable<R> flatMap(@NonNull Function<? super T, ? extends Streamable<? extends R>> mapper,
@NonNull StandardConcurrentConfig config) {
Objects.requireNonNull(mapper, "mapper is null");
Objects.requireNonNull(config, "config is null");
return RxJavaPlugins.onAssembly(new StreamableFlatMap<>(this, mapper, config.maxConcurrency()));
}

// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
// Consumption methods and outgoing converters
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
Expand Down Expand Up @@ -525,17 +588,21 @@
Objects.requireNonNull(executor, "executor is null");
final Streamable<T> me = this;
var future = CompletableFuture.<Void>supplyAsync(() -> {
try (var str = me.stream(canceller)) {
while (!canceller.isDisposed()) {
if (str.awaitNext(canceller)) {
// System.out.println("Received " + str.current());
consumer.accept(Objects.requireNonNull(str.current(), "The upstream Streamable " + me.getClass() + " produced a null element!"));
} else {
// System.out.println("EOF ");
break;
var str = me.stream(canceller);
try {
try {
while (!canceller.isDisposed()) {
if (str.awaitNext(canceller)) {
// System.out.println("Received " + str.current());
consumer.accept(Objects.requireNonNull(str.current(), "The upstream Streamable " + me.getClass() + " produced a null element!"));
} else {
// System.out.println("EOF ");
break;
}
}
} finally {
str.awaitFinish(canceller);
}
// System.out.println("Canceller status after loop: " + canceller.isDisposed());
} catch (final Throwable crash) {
Exceptions.throwIfFatal(crash);
if (crash instanceof CompletionException ce) {
Expand Down Expand Up @@ -567,19 +634,23 @@
Objects.requireNonNull(executor, "executor is null");
final Streamable<T> me = this;
var future = CompletableFuture.<Void>supplyAsync(() -> {
try (var str = me.stream(canceller)) {
var str = me.stream(canceller);
try {
try {
var stopper = Disposable.empty();
while (!canceller.isDisposed() && !stopper.isDisposed()) {
if (str.awaitNext(canceller)) {
// System.out.println("Received " + str.current());
var v = Objects.requireNonNull(str.current(), "The upstream Streamable " + me.getClass() + " produced a null element!");
consumer.accept(v, stopper);
} else {
// System.out.println("EOF ");
break;
while (!canceller.isDisposed() && !stopper.isDisposed()) {
if (str.awaitNext(canceller)) {
// System.out.println("Received " + str.current());
var v = Objects.requireNonNull(str.current(), "The upstream Streamable " + me.getClass() + " produced a null element!");
consumer.accept(v, stopper);
} else {
// System.out.println("EOF ");
break;
}
}
} finally {
str.awaitFinish(canceller);
}
// System.out.println("Canceller status after loop: " + canceller.isDisposed());
} catch (final Throwable crash) {
Exceptions.throwIfFatal(crash);
if (crash instanceof CompletionException ce) {
Expand Down
90 changes: 5 additions & 85 deletions src/main/java/io/reactivex/rxjava4/core/Streamer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

/**
* A realized stream which can then be consumed asynchronously in steps.
* Think of it as the {@IAsyncEnumerator} of the Java world. Runs best on Virtual Threads.

Check warning on line 25 in src/main/java/io/reactivex/rxjava4/core/Streamer.java

View workflow job for this annotation

GitHub Actions / build

unknown tag. Unregistered custom tag?

Check warning on line 25 in src/main/java/io/reactivex/rxjava4/core/Streamer.java

View workflow job for this annotation

GitHub Actions / build

unknown tag. Unregistered custom tag?

Check warning on line 25 in src/main/java/io/reactivex/rxjava4/core/Streamer.java

View workflow job for this annotation

GitHub Actions / build (27)

unknown tag. Unregistered custom tag?

Check warning on line 25 in src/main/java/io/reactivex/rxjava4/core/Streamer.java

View workflow job for this annotation

GitHub Actions / build (27)

unknown tag. Unregistered custom tag?
* <p>
* To make sure you can run finish, use {@link DisposableContainer#clear()} or {@link DisposableContainer#reset()}
* to get rid of all previous registered disposables. finish() will create its own, and if that
Expand All @@ -31,7 +31,7 @@
* TODO proper docs
* @since 4.0.0
*/
public interface Streamer<@NonNull T> extends AutoCloseable, AwaitCoordinator {
public interface Streamer<@NonNull T> extends AwaitCoordinator {

// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
// API
Expand All @@ -46,10 +46,11 @@
CompletionStage<Boolean> next(@NonNull DisposableContainer cancellation);

/**
* Returns the current element if {@link #next()} yielded {@code true}.
* Can be called multiple times between {@link #next()} calls.
* Returns the current element if {@link #next(DisposableContainer)} yielded {@code true}.
* Can be called multiple times between {@link #next(DisposableContainer)} calls.
* @return the current element
* @throws NoSuchElementException before the very first {@link #next()} or after {@link #next()} returned {@code false}
* @throws NoSuchElementException before the very first {@link #next(DisposableContainer)}
* or after {@link #next(DisposableContainer)} returned {@code false}
*/
@NonNull
T current();
Expand All @@ -67,80 +68,6 @@
// HELPERS
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo

/**
* Determine if there are more elements available from the source.
* Uses a default, individual {@link CompositeDisposable} to manage cancellation.
* @return eventually true or false, indicating availability or termination
*/
@NonNull
default CompletionStage<Boolean> next() {
return next(new CompositeDisposable());
}

/**
* Make this Streamer a resource and a Closeable, allowing virtually blocking closing.
*/
@Override
default void close() {
awaitFinish();
}

/**
* Augments the streamer so that the given canceller is injected into the various
* lifecycle await calls.
* @param canceller the canceller to inject
* @return the augmented streamer
*/
default Streamer<T> finishVia(@NonNull DisposableContainer canceller) {
Objects.requireNonNull(canceller, "canceller is null");
if (this instanceof StreamerFinishViaDisposableContainerCanceller<T>(
Streamer<T> streamer, DisposableContainer canceller1
)) {
if (streamer == this && canceller1 == canceller) {
// DO not rewrap!
return this;
}
}

return new StreamerFinishViaDisposableContainerCanceller<>(this, canceller);
}

/**
* Augments the base streamer with a canceller so that it can be injected at the various await calls.
* @param <T> the element type of the stream
*/
static record StreamerFinishViaDisposableContainerCanceller<T>(
@NonNull Streamer<T> streamer, @NonNull DisposableContainer canceller)
implements Streamer<T> {

@Override
public @NonNull CompletionStage<Boolean> next(@NonNull DisposableContainer cancellation) {
// TODO Auto-generated method stub
return streamer.next(cancellation);
}

@Override
public @NonNull T current() {
return streamer.current();
}

@Override
public @NonNull CompletionStage<Void> finish(@NonNull DisposableContainer cancellation) {
return streamer.finish(cancellation);
}

}

/**
* Moves and awaits the sequence's next element, returns false if there are no more
* data.
* @return true if the next element via {@link #current()} can be read, or false if
* the stream ended.
*/
default boolean awaitNext() {
return await(next());
}

/**
* Moves and awaits the sequence's next element, returns false if there are no more
* data.
Expand All @@ -152,13 +79,6 @@
return await(next(cancellation), cancellation);
}

/**
* Finish and cleanup the sequence after its completion or cancellation.
*/
default void awaitFinish() {
await(finish(DisposableContainer.NEVER), DisposableContainer.NEVER);
}

/**
* Who cancels the cancellation attempt? Another cancellation attempt!
* @param cancellation the token to cancel and ongoing cancel attempt
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava4.internal.operators.streamable;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import io.reactivex.rxjava4.annotations.NonNull;
import io.reactivex.rxjava4.core.*;
import io.reactivex.rxjava4.disposables.DisposableContainer;
import io.reactivex.rxjava4.exceptions.Exceptions;
import io.reactivex.rxjava4.functions.Predicate;
import io.reactivex.rxjava4.internal.fuseable.HasUpstreamStreamableSource;

public record StreamableFilter<T>(
@NonNull Streamable<T> source,
@NonNull Predicate<? super T> predicate)
implements Streamable<T>, HasUpstreamStreamableSource<T> {

@Override
public @NonNull Streamer<@NonNull T> stream(@NonNull DisposableContainer cancellation) {
return new FilterStreamer<>(source.stream(cancellation), predicate);
}

static final class FilterStreamer<T> implements Streamer<T> {
final Streamer<T> upstream;
final Predicate<? super T> predicate;
volatile T current;

final AtomicInteger wip = new AtomicInteger();

FilterStreamer(Streamer<T> upstream, Predicate<? super T> predicate) {
this.upstream = upstream;
this.predicate = predicate;
}

@Override
public @NonNull CompletionStage<Boolean> next(@NonNull DisposableContainer cancellation) {
var cf = new CompletableFuture<Boolean>();
drain(cf, cancellation);
return cf;
}

@Override
public @NonNull T current() {
return current;
}

@Override
public @NonNull CompletionStage<Void> finish(@NonNull DisposableContainer cancellation) {
current = null;
return upstream.finish(cancellation);
}

void drain(CompletableFuture<Boolean> cf, DisposableContainer cancellation) {
if (wip.getAndIncrement() != 0) {
return;
}
do {
upstream.next(cancellation)
.whenComplete((v, e) -> {
if (e != null) {
cf.completeExceptionally(e);
} else {
if (v) {
try {
var w = upstream.current();
if (predicate.test(w)) {
current = w;
cf.complete(true);
} else {
drain(cf, cancellation);
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cf.completeExceptionally(ex);
}
} else {
cf.complete(false);
}
}
});
} while (wip.decrementAndGet() != 0);
}
}
}
Loading
Loading