Skip to content

Commit d3ad5d0

Browse files
authored
chore: construct window metadata using request (#195)
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
1 parent da032c6 commit d3ad5d0

8 files changed

Lines changed: 81 additions & 59 deletions

File tree

examples/src/main/java/io/numaproj/numaflow/examples/reduce/sum/SumFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
public class SumFactory extends ReducerFactory<SumFunction> {
99

1010
public static void main(String[] args) throws Exception {
11-
log.info("sum udf was invoked");
11+
log.info("Starting sum udf server");
1212
Server server = new Server(new SumFactory());
1313

1414
// Start the server

examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public void processMessage(
2929
log.info("error while parsing integer - {}", e.getMessage());
3030
}
3131
if (sum >= 100) {
32-
outputStreamObserver.send(new Message(String.valueOf(sum).getBytes()));
32+
outputStreamObserver.send(new Message(String.valueOf(sum).getBytes(), keys));
3333
sum = 0;
3434
}
3535
}

src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import com.google.common.base.Preconditions;
1111
import io.grpc.stub.StreamObserver;
1212
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
13+
import io.numaproj.numaflow.reducer.metadata.IntervalWindowImpl;
14+
import io.numaproj.numaflow.reducer.metadata.MetadataImpl;
1315
import lombok.extern.slf4j.Slf4j;
1416
import scala.PartialFunction;
1517
import scala.collection.Iterable;
@@ -25,31 +27,26 @@
2527
@Slf4j
2628
class ReduceSupervisorActor extends AbstractActor {
2729
private final ReducerFactory<? extends Reducer> reducerFactory;
28-
private final Metadata md;
2930
private final ActorRef shutdownActor;
3031
private final StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver;
3132
private final Map<String, ActorRef> actorsMap = new HashMap<>();
3233

3334
public ReduceSupervisorActor(
3435
ReducerFactory<? extends Reducer> reducerFactory,
35-
Metadata md,
3636
ActorRef shutdownActor,
3737
StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver) {
3838
this.reducerFactory = reducerFactory;
39-
this.md = md;
4039
this.shutdownActor = shutdownActor;
4140
this.responseObserver = responseObserver;
4241
}
4342

4443
public static Props props(
4544
ReducerFactory<? extends Reducer> reducerFactory,
46-
Metadata md,
4745
ActorRef shutdownActor,
4846
StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver) {
4947
return Props.create(
5048
ReduceSupervisorActor.class,
5149
reducerFactory,
52-
md,
5350
shutdownActor,
5451
responseObserver);
5552
}
@@ -92,8 +89,18 @@ public Receive createReceive() {
9289
private void invokeActors(ActorRequest actorRequest) {
9390
String[] keys = actorRequest.getKeySet();
9491
String uniqueId = actorRequest.getUniqueIdentifier();
92+
ReduceOuterClass.Window window = actorRequest.getRequest().getOperation().getWindows(0);
9593
if (!actorsMap.containsKey(uniqueId)) {
9694
Reducer reduceHandler = reducerFactory.createReducer();
95+
// create metadata
96+
IntervalWindow iw = new IntervalWindowImpl(
97+
Instant.ofEpochSecond(
98+
window.getStart().getSeconds(),
99+
window.getStart().getNanos()),
100+
Instant.ofEpochSecond(
101+
window.getEnd().getSeconds(),
102+
window.getEnd().getNanos()));
103+
Metadata md = new MetadataImpl(iw);
97104
ActorRef actorRef = getContext()
98105
.actorOf(ReduceActor.props(keys, md, reduceHandler));
99106
actorsMap.put(uniqueId, actorRef);

src/main/java/io/numaproj/numaflow/reducer/Service.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,6 @@ public StreamObserver<ReduceOuterClass.ReduceRequest> reduceFn(final StreamObser
5555
responseObserver);
5656
}
5757

58-
// get window start and end time from gPRC metadata
59-
String winSt = GrpcServerUtils.WINDOW_START_TIME.get();
60-
String winEt = GrpcServerUtils.WINDOW_END_TIME.get();
61-
62-
// convert the start and end time to Instant
63-
Instant startTime = Instant.ofEpochMilli(Long.parseLong(winSt));
64-
Instant endTime = Instant.ofEpochMilli(Long.parseLong(winEt));
65-
66-
// create metadata
67-
IntervalWindow iw = new IntervalWindowImpl(startTime, endTime);
68-
Metadata md = new MetadataImpl(iw);
69-
7058
CompletableFuture<Void> failureFuture = new CompletableFuture<>();
7159

7260
// create a shutdown actor that listens to exceptions.
@@ -84,7 +72,6 @@ public StreamObserver<ReduceOuterClass.ReduceRequest> reduceFn(final StreamObser
8472
ActorRef supervisorActor = reduceActorSystem
8573
.actorOf(ReduceSupervisorActor.props(
8674
reducerFactory,
87-
md,
8875
shutdownActorRef,
8976
responseObserver));
9077

src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.numaproj.numaflow.reducer;
22

33
import com.google.protobuf.ByteString;
4+
import com.google.protobuf.Timestamp;
45
import io.grpc.Context;
56
import io.grpc.Contexts;
67
import io.grpc.ManagedChannel;
@@ -128,6 +129,15 @@ public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowab
128129
.addKeys("reduce-key")
129130
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
130131
.build())
132+
.setOperation(ReduceOuterClass.ReduceRequest.WindowOperation
133+
.newBuilder()
134+
.addWindows(
135+
ReduceOuterClass.Window
136+
.newBuilder()
137+
.setStart(Timestamp.newBuilder().setSeconds(60000).build())
138+
.setEnd(Timestamp.newBuilder().setSeconds(120000).build())
139+
.build()
140+
))
131141
.build();
132142
inputStreamObserver.onNext(reduceRequest);
133143
}

src/test/java/io/numaproj/numaflow/reducer/ServerTest.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.numaproj.numaflow.reducer;
22

33
import com.google.protobuf.ByteString;
4+
import com.google.protobuf.Timestamp;
45
import io.grpc.Context;
56
import io.grpc.Contexts;
67
import io.grpc.ManagedChannel;
@@ -91,16 +92,11 @@ public void tearDown() throws Exception {
9192
public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequestsGetAggregatedToOneResponse() {
9293
String reduceKey = "reduce-key";
9394

94-
Metadata metadata = new Metadata();
95-
metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000");
96-
metadata.put(Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), "120000");
97-
9895
// create an output stream observer
9996
ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();
10097

10198
StreamObserver<ReduceOuterClass.ReduceRequest> inputStreamObserver = ReduceGrpc
10299
.newStub(inProcessChannel)
103-
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
104100
.reduceFn(outputStreamObserver);
105101

106102
for (int i = 1; i <= 10; i++) {
@@ -110,6 +106,15 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ
110106
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
111107
.addAllKeys(List.of(reduceKey))
112108
.build())
109+
.setOperation(ReduceOuterClass.ReduceRequest.WindowOperation
110+
.newBuilder()
111+
.addWindows(
112+
ReduceOuterClass.Window
113+
.newBuilder()
114+
.setStart(Timestamp.newBuilder().setSeconds(60000).build())
115+
.setEnd(Timestamp.newBuilder().setSeconds(120000).build())
116+
.build()
117+
))
113118
.build();
114119
inputStreamObserver.onNext(request);
115120
}
@@ -143,16 +148,11 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then
143148
String reduceKey = "reduce-key";
144149
int keyCount = 10;
145150

146-
Metadata metadata = new Metadata();
147-
metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000");
148-
metadata.put(Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), "120000");
149-
150151
// create an output stream observer
151152
ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();
152153

153154
StreamObserver<ReduceOuterClass.ReduceRequest> inputStreamObserver = ReduceGrpc
154155
.newStub(inProcessChannel)
155-
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
156156
.reduceFn(outputStreamObserver);
157157

158158
// send messages with keyCount different keys
@@ -164,6 +164,15 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then
164164
.addAllKeys(List.of(reduceKey + j))
165165
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
166166
.build())
167+
.setOperation(ReduceOuterClass.ReduceRequest.WindowOperation
168+
.newBuilder()
169+
.addWindows(
170+
ReduceOuterClass.Window
171+
.newBuilder()
172+
.setStart(Timestamp.newBuilder().setSeconds(60000).build())
173+
.setEnd(Timestamp.newBuilder().setSeconds(120000).build())
174+
.build()
175+
))
167176
.build();
168177
inputStreamObserver.onNext(request);
169178
}

src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,10 @@
55
import akka.actor.AllDeadLetters;
66
import akka.actor.DeadLetter;
77
import com.google.protobuf.ByteString;
8+
import com.google.protobuf.Timestamp;
89
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
9-
import io.numaproj.numaflow.reducer.metadata.IntervalWindowImpl;
10-
import io.numaproj.numaflow.reducer.metadata.MetadataImpl;
1110
import org.junit.Test;
1211

13-
import java.time.Instant;
1412
import java.util.concurrent.CompletableFuture;
1513

1614
import static org.junit.Assert.assertEquals;
@@ -33,14 +31,10 @@ public void testFailure() {
3331
.actorOf(ReduceShutdownActor
3432
.props(completableFuture));
3533

36-
Metadata md = new MetadataImpl(
37-
new IntervalWindowImpl(Instant.now(), Instant.now()));
38-
3934
ActorRef supervisorActor = actorSystem
4035
.actorOf(ReduceSupervisorActor
4136
.props(
4237
new TestExceptionFactory(),
43-
md,
4438
shutdownActor,
4539
new ReduceOutputStreamObserver()));
4640

@@ -49,14 +43,23 @@ public void testFailure() {
4943
.addKeys("reduce-test")
5044
.setValue(ByteString.copyFromUtf8(String.valueOf(1)))
5145
.build())
46+
.setOperation(ReduceOuterClass.ReduceRequest.WindowOperation
47+
.newBuilder()
48+
.addWindows(
49+
ReduceOuterClass.Window
50+
.newBuilder()
51+
.setStart(Timestamp.newBuilder().setSeconds(60000).build())
52+
.setEnd(Timestamp.newBuilder().setSeconds(60000).build())
53+
.build()
54+
))
5255
.build());
5356
supervisorActor.tell(reduceRequest, ActorRef.noSender());
5457

5558
try {
5659
completableFuture.get();
5760
fail("Expected the future to complete with exception");
5861
} catch (Exception e) {
59-
assertEquals(e.getMessage(), "java.lang.RuntimeException: UDF Failure");
62+
assertEquals("java.lang.RuntimeException: UDF Failure", e.getMessage());
6063
}
6164
}
6265

@@ -71,14 +74,10 @@ public void testDeadLetterHandling() {
7174

7275
actorSystem.eventStream().subscribe(shutdownActor, AllDeadLetters.class);
7376

74-
Metadata md = new MetadataImpl(
75-
new IntervalWindowImpl(Instant.now(), Instant.now()));
76-
7777
ActorRef supervisorActor = actorSystem
7878
.actorOf(ReduceSupervisorActor
7979
.props(
8080
new TestExceptionFactory(),
81-
md,
8281
shutdownActor,
8382
new ReduceOutputStreamObserver()));
8483

@@ -89,7 +88,7 @@ public void testDeadLetterHandling() {
8988
completableFuture.get();
9089
fail("Expected the future to complete with exception");
9190
} catch (Exception e) {
92-
assertEquals(e.getMessage(), "java.lang.Throwable: dead letters");
91+
assertEquals("java.lang.Throwable: dead letters", e.getMessage());
9392
}
9493
}
9594

src/test/java/io/numaproj/numaflow/reducer/SupervisorActorTest.java

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,10 @@
33
import akka.actor.ActorRef;
44
import akka.actor.ActorSystem;
55
import com.google.protobuf.ByteString;
6+
import com.google.protobuf.Timestamp;
67
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
7-
import io.numaproj.numaflow.reducer.metadata.IntervalWindowImpl;
8-
import io.numaproj.numaflow.reducer.metadata.MetadataImpl;
98
import org.junit.Test;
109

11-
import java.time.Instant;
1210
import java.util.Arrays;
1311
import java.util.List;
1412
import java.util.concurrent.CompletableFuture;
@@ -29,14 +27,11 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
2927
.actorOf(ReduceShutdownActor
3028
.props(completableFuture));
3129

32-
Metadata md = new MetadataImpl(
33-
new IntervalWindowImpl(Instant.now(), Instant.now()));
34-
3530
ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();
3631

3732
ActorRef supervisorActor = actorSystem
3833
.actorOf(ReduceSupervisorActor
39-
.props(new TestReducerFactory(), md, shutdownActor, outputStreamObserver));
34+
.props(new TestReducerFactory(), shutdownActor, outputStreamObserver));
4035

4136
for (int i = 1; i <= 10; i++) {
4237
ActorRequest reduceRequest = new ActorRequest(ReduceOuterClass.ReduceRequest
@@ -47,6 +42,15 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
4742
.addAllKeys(Arrays.asList("key-1", "key-2"))
4843
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
4944
.build())
45+
.setOperation(ReduceOuterClass.ReduceRequest.WindowOperation
46+
.newBuilder()
47+
.addWindows(
48+
ReduceOuterClass.Window
49+
.newBuilder()
50+
.setStart(Timestamp.newBuilder().setSeconds(60000).build())
51+
.setEnd(Timestamp.newBuilder().setSeconds(60000).build())
52+
.build()
53+
))
5054
.build());
5155
supervisorActor.tell(reduceRequest, ActorRef.noSender());
5256
}
@@ -57,11 +61,12 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
5761
List<ReduceOuterClass.ReduceResponse> result = outputStreamObserver.resultDatum.get();
5862
// the observer should receive 2 messages, one is the aggregated result, the other is the EOF response.
5963
assertEquals(2, result.size());
60-
assertEquals("10", result
61-
.get(0)
62-
.getResult()
63-
.getValue()
64-
.toStringUtf8());
64+
assertEquals(
65+
"10", result
66+
.get(0)
67+
.getResult()
68+
.getValue()
69+
.toStringUtf8());
6570
assertTrue(result
6671
.get(1)
6772
.getEOF());
@@ -80,15 +85,11 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas
8085
.actorOf(ReduceShutdownActor
8186
.props(completableFuture));
8287

83-
Metadata md = new MetadataImpl(
84-
new IntervalWindowImpl(Instant.now(), Instant.now()));
85-
8688
ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();
8789
ActorRef supervisorActor = actorSystem
8890
.actorOf(ReduceSupervisorActor
8991
.props(
9092
new TestReducerFactory(),
91-
md,
9293
shutdownActor,
9394
outputStreamObserver)
9495
);
@@ -102,6 +103,15 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas
102103
.addAllKeys(Arrays.asList("shared-key", "unique-key-" + i))
103104
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
104105
.build())
106+
.setOperation(ReduceOuterClass.ReduceRequest.WindowOperation
107+
.newBuilder()
108+
.addWindows(
109+
ReduceOuterClass.Window
110+
.newBuilder()
111+
.setStart(Timestamp.newBuilder().setSeconds(60000).build())
112+
.setEnd(Timestamp.newBuilder().setSeconds(60000).build())
113+
.build()
114+
))
105115
.build());
106116
supervisorActor.tell(reduceRequest, ActorRef.noSender());
107117
}

0 commit comments

Comments
 (0)