33import akka .actor .ActorRef ;
44import akka .actor .ActorSystem ;
55import akka .actor .AllDeadLetters ;
6+ import com .google .protobuf .Any ;
67import com .google .protobuf .Empty ;
8+ import com .google .rpc .Code ;
9+ import com .google .rpc .DebugInfo ;
710import io .grpc .Status ;
11+ import io .grpc .protobuf .StatusProto ;
812import io .grpc .stub .StreamObserver ;
913import io .numaproj .numaflow .reduce .v1 .ReduceGrpc ;
1014import io .numaproj .numaflow .reduce .v1 .ReduceOuterClass ;
1115import io .numaproj .numaflow .reducestreamer .model .IntervalWindow ;
1216import io .numaproj .numaflow .reducestreamer .model .Metadata ;
1317import io .numaproj .numaflow .reducestreamer .model .ReduceStreamer ;
1418import io .numaproj .numaflow .reducestreamer .model .ReduceStreamerFactory ;
19+ import io .numaproj .numaflow .shared .ExceptionUtils ;
1520import io .numaproj .numaflow .shared .GrpcServerUtils ;
1621import lombok .extern .slf4j .Slf4j ;
1722
@@ -38,8 +43,15 @@ static void handleFailure(
3843 failureFuture .get ();
3944 } catch (Exception e ) {
4045 e .printStackTrace ();
41- var status = Status .UNKNOWN .withDescription (e .getMessage ()).withCause (e );
42- responseObserver .onError (status .asException ());
46+ com .google .rpc .Status status = com .google .rpc .Status .newBuilder ()
47+ .setCode (Code .INTERNAL .getNumber ())
48+ .setMessage (
49+ ExceptionUtils .getExceptionErrorString () + ": " + (e .getMessage () != null ? e .getMessage () : "" ))
50+ .addDetails (Any .pack (DebugInfo .newBuilder ()
51+ .setDetail (ExceptionUtils .getStackTrace (e ))
52+ .build ()))
53+ .build ();
54+ responseObserver .onError (StatusProto .toStatusRuntimeException (status ));
4355 }
4456 }).start ();
4557 }
0 commit comments