Skip to content

Commit bed9477

Browse files
committed
Chapter 6
1 parent 1a6e3e2 commit bed9477

File tree

6 files changed

+399
-0
lines changed

6 files changed

+399
-0
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package lambdasinaction.chap6;
2+
3+
import java.util.concurrent.RecursiveTask;
4+
import java.util.concurrent.ForkJoinPool;
5+
import java.util.concurrent.ForkJoinTask;
6+
import java.util.stream.LongStream;
7+
8+
import static lambdasinaction.chap6.ParallelStreamsHarness.FORK_JOIN_POOL;
9+
10+
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
11+
12+
public static final long THRESHOLD = 10_000;
13+
14+
private final long[] numbers;
15+
private final int start;
16+
private final int end;
17+
18+
public ForkJoinSumCalculator(long[] numbers) {
19+
this(numbers, 0, numbers.length);
20+
}
21+
22+
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
23+
this.numbers = numbers;
24+
this.start = start;
25+
this.end = end;
26+
}
27+
28+
@Override
29+
protected Long compute() {
30+
int length = end - start;
31+
if (length <= THRESHOLD) {
32+
return computeSequentially();
33+
}
34+
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
35+
leftTask.fork();
36+
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
37+
Long rightResult = rightTask.compute();
38+
Long leftResult = leftTask.join();
39+
return leftResult + rightResult;
40+
}
41+
42+
private long computeSequentially() {
43+
long sum = 0;
44+
for (int i = start; i < end; i++) {
45+
sum += numbers[i];
46+
}
47+
return sum;
48+
}
49+
50+
public static long forkJoinSum(long n) {
51+
long[] numbers = LongStream.rangeClosed(1, n).toArray();
52+
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
53+
return FORK_JOIN_POOL.invoke(task);
54+
}
55+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package lambdasinaction.chap6;
2+
3+
import java.util.stream.*;
4+
5+
public class ParallelStreams {
6+
7+
public static long sum(long n) {
8+
return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get();
9+
}
10+
11+
public static long parallelSum(long n) {
12+
return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get();
13+
}
14+
15+
public static long rangedSum(long n) {
16+
return LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong();
17+
}
18+
19+
public static long parallelRangedSum(long n) {
20+
return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong();
21+
}
22+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package lambdasinaction.chap6;
2+
3+
import java.util.concurrent.*;
4+
import java.util.function.*;
5+
6+
public class ParallelStreamsHarness {
7+
8+
public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool();
9+
10+
public static void main(String[] args) {
11+
System.out.println("Sum done in: " + measurePerf(ParallelStreams::sum, 10_000_000L) + " msecs");
12+
System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs" );
13+
System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs");
14+
System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs" );
15+
System.out.println("ForkJoin sum done in: " + measurePerf(ForkJoinSumCalculator::forkJoinSum, 10_000_000L) + " msecs" );
16+
}
17+
18+
public static <T, R> long measurePerf(Function<T, R> f, T input) {
19+
long fastest = Long.MAX_VALUE;
20+
for (int i = 0; i < 10; i++) {
21+
long start = System.nanoTime();
22+
R result = f.apply(input);
23+
long duration = (System.nanoTime() - start) / 1_000_000;
24+
System.out.println("Result: " + result);
25+
if (duration < fastest) fastest = duration;
26+
}
27+
return fastest;
28+
}
29+
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package lambdasinaction.chap6;
2+
3+
import java.util.*;
4+
import java.util.concurrent.*;
5+
import java.util.function.*;
6+
import java.util.stream.*;
7+
8+
/**
9+
* Adapted from http://mail.openjdk.java.net/pipermail/lambda-dev/2013-November/011516.html
10+
*/
11+
public class StreamForker<T> {
12+
13+
private final Stream<T> stream;
14+
private final Map<Object, Function<Stream<T>, ?>> forks = new HashMap<>();
15+
16+
public StreamForker(Stream<T> stream) {
17+
this.stream = stream;
18+
}
19+
20+
public StreamForker<T> fork(Object key, Function<Stream<T>, ?> f) {
21+
forks.put(key, f);
22+
return this;
23+
}
24+
25+
public Results getResults() {
26+
ForkingStreamConsumer<T> consumer = build();
27+
try {
28+
stream.sequential().forEach(consumer);
29+
} finally {
30+
consumer.finish();
31+
}
32+
return consumer;
33+
}
34+
35+
private ForkingStreamConsumer<T> build() {
36+
List<BlockingQueue<T>> queues = new ArrayList<>();
37+
38+
Map<Object, Future<?>> actions =
39+
forks.entrySet().stream().reduce(
40+
new HashMap<Object, Future<?>>(),
41+
(map, e) -> {
42+
map.put(e.getKey(),
43+
getOperationResult(queues, e.getValue()));
44+
return map;
45+
},
46+
(m1, m2) -> {
47+
m1.putAll(m2);
48+
return m1;
49+
});
50+
51+
return new ForkingStreamConsumer<>(queues, actions);
52+
}
53+
54+
private Future<?> getOperationResult(List<BlockingQueue<T>> queues, Function<Stream<T>, ?> f) {
55+
BlockingQueue<T> queue = new LinkedBlockingQueue<>();
56+
queues.add(queue);
57+
Spliterator<T> spliterator = new BlockingQueueSpliterator<>(queue);
58+
Stream<T> source = StreamSupport.stream(spliterator, false);
59+
return CompletableFuture.supplyAsync( () -> f.apply(source) );
60+
}
61+
62+
public static interface Results {
63+
public <R> R get(Object key);
64+
}
65+
66+
private static class ForkingStreamConsumer<T> implements Consumer<T>, Results {
67+
static final Object END_OF_STREAM = new Object();
68+
69+
private final List<BlockingQueue<T>> queues;
70+
private final Map<Object, Future<?>> actions;
71+
72+
ForkingStreamConsumer(List<BlockingQueue<T>> queues, Map<Object, Future<?>> actions) {
73+
this.queues = queues;
74+
this.actions = actions;
75+
}
76+
77+
@Override
78+
public void accept(T t) {
79+
queues.forEach(q -> q.add(t));
80+
}
81+
82+
@Override
83+
public <R> R get(Object key) {
84+
try {
85+
return ((Future<R>) actions.get(key)).get();
86+
} catch (Exception e) {
87+
throw new RuntimeException(e);
88+
}
89+
}
90+
91+
void finish() {
92+
accept((T) END_OF_STREAM);
93+
}
94+
}
95+
96+
private static class BlockingQueueSpliterator<T> implements Spliterator<T> {
97+
private final BlockingQueue<T> q;
98+
99+
BlockingQueueSpliterator(BlockingQueue<T> q) {
100+
this.q = q;
101+
}
102+
103+
@Override
104+
public boolean tryAdvance(Consumer<? super T> action) {
105+
T t;
106+
while (true) {
107+
try {
108+
t = q.take();
109+
break;
110+
}
111+
catch (InterruptedException e) {
112+
}
113+
}
114+
115+
if (t != ForkingStreamConsumer.END_OF_STREAM) {
116+
action.accept(t);
117+
return true;
118+
}
119+
120+
return false;
121+
}
122+
123+
@Override
124+
public Spliterator<T> trySplit() {
125+
return null;
126+
}
127+
128+
@Override
129+
public long estimateSize() {
130+
return 0;
131+
}
132+
133+
@Override
134+
public int characteristics() {
135+
return 0;
136+
}
137+
}
138+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package lambdasinaction.chap6;
2+
3+
import lambdasinaction.chap5.*;
4+
5+
import static java.util.stream.Collectors.*;
6+
import static lambdasinaction.chap5.Dish.menu;
7+
8+
import java.util.*;
9+
import java.util.stream.*;
10+
11+
public class StreamForkerExample {
12+
13+
public static void main(String[] args) throws Exception {
14+
processMenu();
15+
}
16+
17+
private static void processMenu() {
18+
Stream<Dish> menuStream = menu.stream();
19+
20+
StreamForker.Results results = new StreamForker<Dish>(menuStream)
21+
.fork("shortMenu", s -> s.map(Dish::getName).collect(joining(", ")))
22+
.fork("totalCalories", s -> s.mapToInt(Dish::getCalories).sum())
23+
.fork("mostCaloricDish", s -> s.collect(
24+
reducing((d1, d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2))
25+
.get())
26+
.fork("dishesByType", s -> s.collect(groupingBy(Dish::getType)))
27+
.getResults();
28+
29+
String shortMeny = results.get("shortMenu");
30+
int totalCalories = results.get("totalCalories");
31+
Dish mostCaloricDish = results.get("mostCaloricDish");
32+
Map<Dish.Type, List<Dish>> dishesByType = results.get("dishesByType");
33+
34+
System.out.println("Short menu: " + shortMeny);
35+
System.out.println("Total calories: " + totalCalories);
36+
System.out.println("Most caloric dish: " + mostCaloricDish);
37+
System.out.println("Dishes by type: " + dishesByType);
38+
}
39+
}

0 commit comments

Comments
 (0)