Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.tez.runtime.task;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
Expand All @@ -33,13 +34,15 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import javax.annotation.Nullable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
Expand All @@ -51,6 +54,8 @@
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.log4j.helpers.ThreadLocalMap;
import org.apache.tez.client.registry.AMRecord;
import org.apache.tez.client.registry.zookeeper.ZkAMRegistryClient;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.Preconditions;
Expand All @@ -64,6 +69,7 @@
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
Expand Down Expand Up @@ -517,45 +523,92 @@ public static TezChild newTezChild(Configuration conf, String host, int port, St
hadoopShim);
}

public static void main(String[] args) throws IOException, InterruptedException, TezException {
public static void main(String[] args) throws Exception {
TezClassLoader.setupTezClassLoader();
final Configuration defaultConf = new Configuration();

String frameworkMode = System.getenv("TEZ_FRAMEWORK_MODE");
if (frameworkMode == null || frameworkMode.isEmpty()) {
frameworkMode = defaultConf.get("tez.framework.mode", "YARN");
}

String host, appId, tokenIdentifier, containerIdentifier;
int port, attemptNumber;
Credentials credentials = new Credentials();

if ("STANDALONE_ZOOKEEPER".equalsIgnoreCase(frameworkMode)) {
DAGProtos.ConfigurationProto confProtoBefore = TezUtilsInternal.loadConfProtoFromText();
TezUtilsInternal.addUserSpecifiedTezConfiguration(
defaultConf, confProtoBefore.getConfKeyValuesList());

ZkAMRegistryClient registry = ZkAMRegistryClient.getClient(defaultConf);
registry.start();

while (!registry.isInitialized()) {
TimeUnit.SECONDS.sleep(5);
}

List<AMRecord> records = registry.getAllRecords();
if (records.isEmpty()) {
throw new RuntimeException("No AM found in ZooKeeper registry");
}
AMRecord amRecord = records.getFirst();

host = amRecord.getHostName();
port = amRecord.getPort();
appId = amRecord.getApplicationId().toString();
tokenIdentifier = appId;
attemptNumber = 1;
containerIdentifier = "container_" + appId + "_01_000001";

// FIX: Dummy token
JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(appId));
Token<JobTokenIdentifier> sessionToken =
new Token<>(identifier, new JobTokenSecretManager(defaultConf));
credentials.addToken(new Text("SessionToken"), sessionToken);

LOG.info("ZK Mode: Discovered AM {} at {}:{}", appId, host, port);

} else {
assert args.length == 5;
host = args[0];
port = Integer.parseInt(args[1]);
containerIdentifier = args[2];
tokenIdentifier = args[3];
attemptNumber = Integer.parseInt(args[4]);

DAGProtos.ConfigurationProto confProto =
TezUtilsInternal.readUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()));
TezUtilsInternal.addUserSpecifiedTezConfiguration(
defaultConf, confProto.getConfKeyValuesList());
}
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
final String pid = System.getenv().get("JVM_PID");
String[] localDirs =
TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name()));


assert args.length == 5;
String host = args[0];
int port = Integer.parseInt(args[1]);
final String containerIdentifier = args[2];
final String tokenIdentifier = args[3];
final int attemptNumber = Integer.parseInt(args[4]);
final String[] localDirs = TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS
.name()));
CallerContext.setCurrent(new CallerContext.Builder("tez_"+tokenIdentifier).build());
LOG.info("TezChild starting with PID=" + pid + ", containerIdentifier=" + containerIdentifier);
CallerContext.setCurrent(new CallerContext.Builder("tez_" + tokenIdentifier).build());
LOG.info("TezChild starting with PID={}, containerIdentifier={}", pid, containerIdentifier);
if (LOG.isDebugEnabled()) {
LOG.debug("Info from cmd line: AM-host: " + host + " AM-port: " + port
+ " containerIdentifier: " + containerIdentifier + " appAttemptNumber: " + attemptNumber
+ " tokenIdentifier: " + tokenIdentifier);
LOG.debug(
"Info from cmd line: AM-host: {} AM-port: {} containerIdentifier: {} appAttemptNumber: {} tokenIdentifier: {}",
host,
port,
containerIdentifier,
attemptNumber,
tokenIdentifier);
credentials = UserGroupInformation.getCurrentUser().getCredentials();
}

// Security framework already loaded the tokens into current ugi
DAGProtos.ConfigurationProto confProto =
TezUtilsInternal.readUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()));
TezUtilsInternal.addUserSpecifiedTezConfiguration(defaultConf, confProto.getConfKeyValuesList());
UserGroupInformation.setConfiguration(defaultConf);
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();

HadoopShim hadoopShim = new HadoopShimsLoader(defaultConf).getHadoopShim();

// log the system properties
if (LOG.isInfoEnabled()) {
String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(defaultConf);
if (systemPropsToLog != null) {
LOG.info(systemPropsToLog);
}
LOG.info(systemPropsToLog);
}

TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier,
Expand Down