Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ dependencies {
testImplementation "org.reactivestreams:reactive-streams-tck-flow:$reactiveStreamsVersion"

testImplementation "org.junit.jupiter:junit-jupiter:$jupiterVersion"
testRuntimeOnly "org.junit.vintage:junit-vintage-engine:$jupiterVersion"

// The missing piece – required by Gradle 9+
testRuntimeOnly "org.junit.platform:junit-platform-launcher:$jupiterLauncherVersion" // match your JUnit version family
// Explicitly add these for IDE compatibility (especially Eclipse)
testImplementation "org.junit.platform:junit-platform-commons:$jupiterLauncherVersion"
testImplementation "org.junit.platform:junit-platform-launcher:$jupiterLauncherVersion"

testRuntimeOnly "org.junit.vintage:junit-vintage-engine:$jupiterVersion"
testRuntimeOnly "org.junit.platform:junit-platform-launcher:$jupiterLauncherVersion" // already have this
}

// === Experimental JDK handling for Outreach Program ===
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import io.reactivex.rxjava4.annotations.NonNull;
import io.reactivex.rxjava4.disposables.*;
import io.reactivex.rxjava4.internal.util.AwaitCoordinatorStatic;
import io.reactivex.rxjava4.plugins.RxJavaPlugins;

/**
Expand Down Expand Up @@ -84,7 +85,7 @@ public CompletionStageDisposable(@NonNull CompletionStage<T> stage, @NonNull Dis
*/
public void await() {
state.lazySet(true);;
Streamer.await(stage);
AwaitCoordinatorStatic.await(stage);
}

/**
Expand All @@ -93,7 +94,7 @@ public void await() {
*/
public void await(DisposableContainer canceller) {
state.lazySet(true);;
Streamer.await(stage, canceller);
AwaitCoordinatorStatic.await(stage, canceller);
}

/**
Expand Down
46 changes: 45 additions & 1 deletion src/main/java/io/reactivex/rxjava4/core/Streamable.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
package io.reactivex.rxjava4.core;

import java.lang.reflect.InvocationTargetException;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.*;

import io.reactivex.rxjava4.annotations.*;
import io.reactivex.rxjava4.disposables.*;
import io.reactivex.rxjava4.exceptions.Exceptions;
import io.reactivex.rxjava4.functions.*;
import io.reactivex.rxjava4.internal.operators.streamable.*;
import io.reactivex.rxjava4.internal.util.AwaitCoordinatorStatic;
import io.reactivex.rxjava4.schedulers.Schedulers;
import io.reactivex.rxjava4.subscribers.TestSubscriber;

Expand Down Expand Up @@ -170,6 +171,49 @@ static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source, @NonNu
.toStreamable();
}

/**
* Generates a sequence in order which the stages complete in any form.
* @param <T> the common element type
* @param stages the iterable of stages to be relayed in the order they complete
* @param executor the executor to run the blocking operator
* @return the new Streamable instance
*/
@SuppressWarnings("unchecked")
@NonNull
static <@NonNull T> Streamable<CompletionStage<T>> fromStages(@NonNull Iterable<? extends CompletionStage<? extends T>> stages, ExecutorService executor) {
return create(emitter -> {
var list = new ArrayList<CompletionStage<? extends T>>();
for(var stage : stages) {
list.add(stage);
}
while (list.size() != 0) {
var winner = AwaitCoordinatorStatic.awaitFirstIndex(list, emitter.canceller());
emitter.emit((CompletionStage<T>)list.remove(winner));
}
}, executor);
}

/**
* Emits the elements of each inner sequence produced by the outher sequence.
* @param <T> the common element type
* @param sources the streamable of inner streamables
* @param exec the executorservice where to run the virtual wait
* @return the new Streamable instance.
*/
static <@NonNull T> Streamable<T> concat(Streamable<? extends Streamable<? extends T>> sources, ExecutorService exec) {
return create(emitter -> {
try (var mainSource = sources.forEach(item -> {
try (var innerSource = item.forEach(inner -> {
emitter.emit(inner);
}, emitter.canceller().derive(), exec)) {
innerSource.await(emitter.canceller());
}
}, emitter.canceller(), exec)) {
mainSource.await(emitter.canceller());
};
}, exec);
}

// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
// Operators
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
Expand Down
83 changes: 4 additions & 79 deletions src/main/java/io/reactivex/rxjava4/core/Streamer.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
package io.reactivex.rxjava4.core;

import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.concurrent.CompletionStage;

import io.reactivex.rxjava4.annotations.*;
import io.reactivex.rxjava4.annotations.NonNull;
import io.reactivex.rxjava4.disposables.*;
import io.reactivex.rxjava4.internal.util.AwaitCoordinator;

/**
* A realized stream which can then be consumed asynchronously in steps.
Expand All @@ -31,7 +31,7 @@
* TODO proper docs
* @since 4.0.0
*/
public interface Streamer<@NonNull T> extends AutoCloseable {
public interface Streamer<@NonNull T> extends AutoCloseable, AwaitCoordinator {

// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
// API
Expand Down Expand Up @@ -195,79 +195,4 @@ default void awaitFinish() {
default void awaitFinish(@NonNull DisposableContainer cancellation) {
await(finish(cancellation), cancellation);
}

// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
// ASYNC/AWAIT "Language" keyword implementations
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo

/**
* The {@code await} keyword for async/await.
* @param <T> the type of the returned value if any.
* @param stage the stage to await virtual-blockingly
* @return the awaited value
*/
@Nullable
static <T> T await(@NonNull CompletionStage<T> stage) {
return await(stage, null);
}

/**
* The cancellable {@code await} keyword for async/await.
* @param <T> the type of the returned value if any.
* @param stage the stage to await virtual-blockingly
* @param canceller the container that can trigger a cancellation on demand
* @return the awaited value
*/
@Nullable
static <T> T await(@NonNull CompletionStage<T> stage, @Nullable DisposableContainer canceller) {
var f = stage.toCompletableFuture();
if (canceller == null) {
return f.join();
}
var d = Disposable.fromFuture(f, true);
try (var _ = canceller.subscribe(d)) {
return f.join();
}
}

/**
* Runs a function while turning it into a CompletionStage with a canceller supplied too.
* @param <U> the return type of the function
* @param function the function to apply
* @param canceller the canceller to use
* @param executor the executor to use
* @return the new stage
*/
static <U> CompletionStage<U> runStage(Function<DisposableContainer, U> function,
DisposableContainer canceller, Executor executor) {
var loopback = new SerialDisposable();
canceller.add(loopback);

// new Exception().printStackTrace();

var f = CompletableFuture.supplyAsync(() -> {
try {
return function.apply(canceller);
} finally {
canceller.delete(loopback);
}
}, executor);

var d = Disposable.fromFuture(f, true);
loopback.replace(d);

return f;
}

/**
* Runs a function while turning it into a CompletionStage with a canceller supplied too.
* @param <U> the return type of the function
* @param function the function to apply
* @param canceller the canceller to use
* @return the new stage
*/
static <U> CompletionStage<U> runStage(Function<DisposableContainer, U> function,
DisposableContainer canceller) {
return runStage(function, canceller, Executors.newVirtualThreadPerTaskExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -268,4 +268,14 @@ public void reset() {
resources = null;
}
}

@Override
public DisposableContainer derive() {
var result = new CompositeDisposable();

add(result);
result.add(Disposable.fromRunnable(() -> delete(result)));

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public interface DisposableContainer extends Disposable {
*/
void clear();

/**
* Create a derived sub container that can get cancelled by this container,
* but cancelling the subcontainer does not cancel this container.
* @return the derived subcontainer
* @since 4.0
*/
DisposableContainer derive();

/**
* Registers a {@link Disposable} with this container so that it can be removed and disposed
* via a simple {@link #dispose()} call to the returned Disposable.
Expand Down Expand Up @@ -133,5 +141,10 @@ public void reset() {
public void clear() {
// Who cares?
}

@Override
public DisposableContainer derive() {
return NEVER;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,14 @@ public void reset() {
}
}

@Override
public DisposableContainer derive() {
var result = new ListCompositeDisposable();

add(result);
result.add(Disposable.fromRunnable(() -> delete(result)));

return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.reactivex.rxjava4.disposables.DisposableContainer;
import io.reactivex.rxjava4.internal.fuseable.HasUpstreamPublisher;
import io.reactivex.rxjava4.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava4.internal.util.ExceptionHelper;
import io.reactivex.rxjava4.internal.util.*;
import io.reactivex.rxjava4.internal.virtual.VirtualResumable;

public record StreamableFromPublisher<T>(@NonNull Publisher<T> source,
Expand Down Expand Up @@ -89,7 +89,7 @@ public void onComplete() {
@Override
public @NonNull CompletionStage<Boolean> next(@NonNull DisposableContainer canceller) {
// System.out.println("next()");
return Streamer.runStage(_ -> {
return AwaitCoordinatorStatic.runStage(_ -> {
item.lazySet(null);
// System.out.println("Requesting the next item");
SubscriptionHelper.deferredRequest(upstream, requester, 1);
Expand Down Expand Up @@ -143,7 +143,7 @@ public void onComplete() {
@Override
public @NonNull CompletionStage<Void> finish(@NonNull DisposableContainer cancellation) {
// new Exception("StreamableFromPublisher::finish").printStackTrace();
return Streamer.runStage(_ -> {
return AwaitCoordinatorStatic.runStage(_ -> {
SubscriptionHelper.cancel(upstream);
return null;
}, cancellation, executor);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava4.internal.util;

import java.util.concurrent.*;
import java.util.function.Function;

import io.reactivex.rxjava4.annotations.*;
import io.reactivex.rxjava4.disposables.DisposableContainer;

/**
* Static methods to coordinate {@link CompletionStage}s for various operators.
*/
public interface AwaitCoordinator {

/**
* The {@code await} keyword for async/await.
* @param <T> the type of the returned value if any.
* @param stage the stage to await virtual-blockingly
* @return the awaited value
*/
@Nullable
default <T> T await(@NonNull CompletionStage<T> stage) {
return AwaitCoordinatorStatic.await(stage, null);
}

/**
* The cancellable {@code await} keyword for async/await.
* @param <T> the type of the returned value if any.
* @param stage the stage to await virtual-blockingly
* @param canceller the container that can trigger a cancellation on demand
* @return the awaited value
*/
@Nullable
default <T> T await(@NonNull CompletionStage<T> stage, @Nullable DisposableContainer canceller) {
return AwaitCoordinatorStatic.await(stage, canceller);
}

/**
* Runs a function while turning it into a CompletionStage with a canceller supplied too.
* @param <U> the return type of the function
* @param function the function to apply
* @param canceller the canceller to use
* @param executor the executor to use
* @return the new stage
*/
default <U> CompletionStage<U> runStage(Function<DisposableContainer, U> function,
DisposableContainer canceller, Executor executor) {
return AwaitCoordinatorStatic.<U>runStage(function, canceller, executor);
}

/**
* Runs a function while turning it into a CompletionStage with a canceller supplied too.
* @param <U> the return type of the function
* @param function the function to apply
* @param canceller the canceller to use
* @return the new stage
*/
default <U> CompletionStage<U> runStage(Function<DisposableContainer, U> function,
DisposableContainer canceller) {
return runStage(function, canceller, Executors.newVirtualThreadPerTaskExecutor());
}
}
Loading