-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathParallelReduce.cs
More file actions
123 lines (105 loc) · 3.9 KB
/
ParallelReduce.cs
File metadata and controls
123 lines (105 loc) · 3.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
using Unity.Burst;
using Unity.Jobs;
using Unity.Collections;
/// <summary>
/// An interface that defines a binary operation.
/// The operation can be any binary operation (sum, difference, multiply, min, max, etc...).
/// </summary>
/// <typeparam name="T">The type of the values to perform the operation on.</typeparam>
public interface IBinaryOperator<T> where T : struct
{
public T Operator(T a, T b);
}
[BurstCompile(CompileSynchronously = true)]
public struct ParallelReduceJob<T, U> : IJobParallelForBatch
where T : struct
where U : struct, IBinaryOperator<T>
{
// The step rate of the source array.
public int Step;
[ReadOnly]
public NativeSlice<T> Src;
[WriteOnly]
public NativeSlice<T> Dst;
/// <summary>
/// The operation to perform on the values of the array.
/// </summary>
public U Operator;
/// <summary>
/// Serial reduction.
/// </summary>
/// <param name="src">The source array to reduce.</param>
/// <returns>The reduced value.</returns>
public static T1 Reduce<T1, U1>(in NativeSlice<T1> src, int step, U1 op)
where T1 : struct
where U1 : struct, IBinaryOperator<T1>
{
T1 val = src[0];
for (int i = step; i < src.Length; i += step)
{
val = op.Operator(val, src[i]);
}
return val;
}
public void Execute(int startIndex, int count)
{
Dst[startIndex] = Reduce(Src.Slice(startIndex, count), Step, Operator);
}
}
public static class ParallelReduce
{
/// <summary>
/// Swap the arrays. This is very efficient for NativeArray as only the internal
/// memory pointer is swapped, not the values of the array.
/// </summary>
/// <typeparam name="T">The value type of the arrays being swapped.</typeparam>
/// <param name="a">The first array to swap with the second.</param>
/// <param name="b">The second array to swap with the first.</param>
private static void Swap<T>(ref NativeArray<T> a, ref NativeArray<T> b)
where T : struct
{
(a, b) = (b, a);
}
/// <summary>
/// Perform a parallel reduction on the elements of the array.
/// </summary>
/// <typeparam name="T">The type of the elements to be reduced.</typeparam>
/// <typeparam name="U">The type of the binary operation to perform on each element of the array.</typeparam>
/// <param name="values">The values to be reduced.</param>
/// <param name="op">The operation to perform on the elements of the array.</param>
/// <returns>The result of the reduction.</returns>
public static T Reduce<T, U>(in NativeArray<T> values, U op)
where T : struct
where U : struct, IBinaryOperator<T>
{
// The number of values to reduce per thread batch.
const int BATCH_SIZE = 1024;
// The step rate for the reduction.
// On the first iteration, this is every value of the source array.
int stepRate = 1;
// How many values to reduce in the current batch.
int batchSize = BATCH_SIZE;
JobHandle job = default;
var src = new NativeArray<T>(values.Length, Allocator.TempJob, NativeArrayOptions.UninitializedMemory);
var dst = new NativeArray<T>(values, Allocator.TempJob);
while (stepRate < values.Length)
{
Swap(ref src, ref dst);
job = new ParallelReduceJob<T, U>
{
Src = src,
Dst = dst,
Step = stepRate,
Operator = op,
}.ScheduleBatch(values.Length, batchSize, job);
// Increment the step rate and batch size.
stepRate = batchSize;
batchSize *= 2;
}
job.Complete();
T res = dst[0];
src.Dispose();
dst.Dispose();
return res;
}
}