-
Notifications
You must be signed in to change notification settings - Fork 196
Expand file tree
/
Copy pathStreamConvertersToJava.java
More file actions
95 lines (75 loc) · 2.79 KB
/
StreamConvertersToJava.java
File metadata and controls
95 lines (75 loc) · 2.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.converters;
import static org.junit.Assert.assertEquals;
// #import
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.BaseStream;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import jdocs.AbstractJavaTest;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.japi.function.Creator;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.javadsl.StreamConverters;
// #import
import org.apache.pekko.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/** */
public class StreamConvertersToJava extends AbstractJavaTest {
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create("StreamConvertersToJava");
}
@AfterClass
public static void tearDown() {
TestKit.shutdownActorSystem(system);
system = null;
}
@Test
public void demonstrateConverterToJava8Stream() {
// #asJavaStream
Source<Integer, NotUsed> source = Source.range(0, 9).filter(i -> i % 2 == 0);
Sink<Integer, java.util.stream.Stream<Integer>> sink = StreamConverters.<Integer>asJavaStream();
Stream<Integer> jStream = source.runWith(sink, system);
// #asJavaStream
assertEquals(5, jStream.count());
}
@Test
public void demonstrateConverterToJava8StreamOnSink() {
// #asJavaStreamOnSink
Source<Integer, NotUsed> source = Source.range(0, 9).filter(i -> i % 2 == 0);
Stream<Integer> jStream = source.runWith(Sink.asJavaStream(), system);
// #asJavaStreamOnSink
assertEquals(5, jStream.count());
}
@Test
public void demonstrateCreatingASourceFromJava8Stream()
throws InterruptedException, ExecutionException, TimeoutException {
// #fromJavaStream
Creator<BaseStream<Integer, IntStream>> creator = () -> IntStream.rangeClosed(0, 9);
Source<Integer, NotUsed> source = StreamConverters.fromJavaStream(creator);
Sink<Integer, CompletionStage<Integer>> sink = Sink.last();
CompletionStage<Integer> integerCompletionStage = source.runWith(sink, system);
// #fromJavaStream
assertEquals(
9, integerCompletionStage.toCompletableFuture().get(5, TimeUnit.SECONDS).intValue());
}
}