Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,23 @@ public static void main(String[] args) throws Exception {
@VisibleForTesting
public static void main(Function<String, String> environmentVarGetter) throws Exception {
JvmInitializers.runOnStartup();
System.out.format("SDK Fn Harness started%n");
System.out.format("Harness ID %s%n", environmentVarGetter.apply(HARNESS_ID));
System.out.format(
"Logging location %s%n", environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
System.out.format(
"Control location %s%n", environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
System.out.format(
"Status location %s%n", environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));

Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
getApiServiceDescriptor(environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
Endpoints.ApiServiceDescriptor statusApiServiceDescriptor =
environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null
? null
: getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
String id = environmentVarGetter.apply(HARNESS_ID);

System.out.format("SDK Fn Harness started%n");
System.out.format("Harness ID %s%n", id);
System.out.format("Logging location %s%n", loggingApiServiceDescriptor);
System.out.format("Control location %s%n", controlApiServiceDescriptor);
System.out.format("Status location %s%n", statusApiServiceDescriptor);

String pipelineOptionsJson = environmentVarGetter.apply(PIPELINE_OPTIONS);
// Try looking for a file first. If that exists it should override PIPELINE_OPTIONS to avoid
// maxing out the kernel's environment space
Expand All @@ -179,16 +186,6 @@ public static void main(Function<String, String> environmentVarGetter) throws Ex

PipelineOptions options = PipelineOptionsTranslation.fromJson(pipelineOptionsJson);

Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
getApiServiceDescriptor(environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));

Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));

Endpoints.ApiServiceDescriptor statusApiServiceDescriptor =
environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null
? null
: getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
String runnerCapabilitesOrNull = environmentVarGetter.apply(RUNNER_CAPABILITIES);
Set<String> runnerCapabilites =
runnerCapabilitesOrNull == null
Expand Down Expand Up @@ -415,7 +412,7 @@ private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String id) {
// directExecutor() when building the channel.
BeamFnControlClient control =
new BeamFnControlClient(
controlStub.withExecutor(MoreExecutors.directExecutor()),
controlStub.withExecutor(MoreExecutors.directExecutor()).withWaitForReady(),
outboundObserverFactory,
executorService,
handlers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ public StreamWriter(ManagedChannel channel) {
this.streamPhaser = new AdvancingPhaser(1);
this.channel = channel;

BeamFnLoggingGrpc.BeamFnLoggingStub stub = BeamFnLoggingGrpc.newStub(channel);
BeamFnLoggingGrpc.BeamFnLoggingStub stub =
BeamFnLoggingGrpc.newStub(channel).withWaitForReady();
this.inboundObserver = new LogControlObserver();
this.outboundObserver =
new DirectStreamObserver<BeamFnApi.LogEntry.List>(
Expand Down
Loading