diff --git a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/JStormOnYarn.java b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/JStormOnYarn.java index 80749457c..4e8c7c864 100644 --- a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/JStormOnYarn.java +++ b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/JStormOnYarn.java @@ -66,7 +66,6 @@ */ public class JstormOnYarn { private static final Log LOG = LogFactory.getLog(JstormOnYarn.class); - // Main class to invoke application master private final String appMasterMainClass; private JstormClientContext jstormClientContext = new JstormClientContext(); @@ -95,54 +94,20 @@ private void printUsage() { /** * Parse command line options - * - * @param args Parsed command line options - * @return Whether the init was successful to run the client - * @throws ParseException */ public boolean init(String[] args) throws ParseException { CommandLine cliParser = new GnuParser().parse(jstormClientContext.opts, args); - if (args.length == 0) { - throw new IllegalArgumentException("No args specified for client to initialize"); - } - if (cliParser.hasOption(JOYConstants.LOG_PROPERTIES)) { - String log4jPath = cliParser.getOptionValue(JOYConstants.LOG_PROPERTIES); - try { - Log4jPropertyHelper.updateLog4jConfiguration(JstormOnYarn.class, log4jPath); - } catch (Exception e) { - LOG.warn("Can not set up custom log4j properties. " + e); - } - } if (cliParser.hasOption(JOYConstants.HELP)) { printUsage(); return false; } - if (cliParser.hasOption(JOYConstants.HELP)) { - jstormClientContext.debugFlag = true; - } - if (cliParser.hasOption(JOYConstants.KEEP_CONTAINERS_ACROSS_APPLICATION_ATTEMPTS)) { - jstormClientContext.keepContainers = true; - } jstormClientContext.appName = cliParser.getOptionValue(JOYConstants.APP_NAME_KEY, JOYConstants.CLIIENT_CLASS); jstormClientContext.amPriority = Integer.parseInt(cliParser.getOptionValue(JOYConstants.PRIORITY, JOYConstants.DEFAULT_PRIORITY)); jstormClientContext.amQueue = cliParser.getOptionValue(JOYConstants.QUEUE, JOYConstants.QUEUE_NAME); jstormClientContext.amMemory = Integer.parseInt(cliParser.getOptionValue(JOYConstants.MASTER_MEMORY, JOYConstants.DEFAULT_MASTER_MEMORY)); jstormClientContext.amVCores = Integer.parseInt(cliParser.getOptionValue(JOYConstants.MASTER_VCORES, JOYConstants.DEFAULT_MASTER_VCORES)); - if (jstormClientContext.amMemory < 0) { - throw new IllegalArgumentException("Invalid memory specified for application master, exiting." - + " Specified memory=" + jstormClientContext.amMemory); - } - if (jstormClientContext.amVCores < 0) { - throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting." - + " Specified virtual cores=" + jstormClientContext.amVCores); - } - - if (!cliParser.hasOption(JOYConstants.JAR)) { - throw new IllegalArgumentException("No jar file specified for application master"); - } - jstormClientContext.appMasterJar = cliParser.getOptionValue(JOYConstants.JAR); jstormClientContext.libJars = cliParser.getOptionValue(JOYConstants.LIB_JAR); jstormClientContext.homeDir = cliParser.getOptionValue(JOYConstants.HOME_DIR); @@ -151,116 +116,9 @@ public boolean init(String[] args) throws ParseException { jstormClientContext.nameNodeHost = cliParser.getOptionValue(JOYConstants.NN_ADDRESS, JOYConstants.EMPTY); jstormClientContext.deployPath = cliParser.getOptionValue(JOYConstants.DEPLOY_PATH, JOYConstants.EMPTY); jstormClientContext.hadoopConfDir = cliParser.getOptionValue(JOYConstants.HADOOP_CONF_DIR, JOYConstants.EMPTY); - jstormClientContext.instanceName = cliParser.getOptionValue(JOYConstants.INSTANCE_NAME, JOYConstants.EMPTY); - LOG.info("Application client instance name:" + jstormClientContext.instanceName); - - if (!jstormClientContext.rmHost.equals(JOYConstants.EMPTY)) { - jstormClientContext.conf.set(JOYConstants.RM_ADDRESS_KEY, jstormClientContext.rmHost, JOYConstants.YARN_CONF_MODE); - } - if (!jstormClientContext.nameNodeHost.equals(JOYConstants.EMPTY)) { - jstormClientContext.conf.set(JOYConstants.FS_DEFAULTFS_KEY, jstormClientContext.nameNodeHost); - } - LOG.info(JstormOnYarn.class.getProtectionDomain() - .getCodeSource().getLocation().getPath()); - String jarPath = JstormOnYarn.class.getProtectionDomain() - .getCodeSource().getLocation().getPath(); - if (jstormClientContext.confFile == null) { - JstormYarnUtils.getYarnConfFromJar(jarPath); - this.jstormClientContext.conf.addResource(JOYConstants.CONF_NAME); - } else { - Path jstormyarnConfPath = new Path(jstormClientContext.confFile); - LOG.info(jstormyarnConfPath.getName()); - this.jstormClientContext.conf.addResource(jstormyarnConfPath); - } - - if (!StringUtils.isBlank(jstormClientContext.hadoopConfDir)) { - try { - Collection files = FileUtils.listFiles(new File(jstormClientContext.hadoopConfDir), new String[]{JOYConstants.XML}, true); - for (File file : files) { - LOG.info("adding hadoop conf file to conf: " + file.getAbsolutePath()); - this.jstormClientContext.conf.addResource(file.getAbsolutePath()); - } - } catch (Exception ex) { - LOG.error("failed to list hadoop conf dir: " + jstormClientContext.hadoopConfDir); - } - } - - if (!cliParser.hasOption(JOYConstants.SHELL_SCRIPT)) { - String jarShellScriptPath = jarPath + JOYConstants.START_JSTORM_SHELL; - try { - InputStream stream = new FileInputStream(jarShellScriptPath); - FileOutputStream out = new FileOutputStream(JOYConstants.START_JSTORM_SHELL); - out.write(IOUtils.toByteArray(stream)); - out.close(); - jstormClientContext.shellScriptPath = JOYConstants.START_JSTORM_SHELL; - } catch (Exception e) { - throw new IllegalArgumentException( - "No shell script specified to be executed by application master to start nimbus and supervisor"); - } - } else if (cliParser.hasOption(JOYConstants.SHELL_COMMAND) && cliParser.hasOption(JOYConstants.SHELL_SCRIPT)) { - throw new IllegalArgumentException("Can not specify shell_command option " + - "and shell_script option at the same time"); - } else if (cliParser.hasOption(JOYConstants.SHELL_COMMAND)) { - jstormClientContext.shellCommand = cliParser.getOptionValue(JOYConstants.SHELL_COMMAND); - } else { - jstormClientContext.shellScriptPath = cliParser.getOptionValue(JOYConstants.SHELL_SCRIPT); - } - if (cliParser.hasOption(JOYConstants.SHELL_ARGS)) { - jstormClientContext.shellArgs = cliParser.getOptionValues(JOYConstants.SHELL_ARGS); - } - if (cliParser.hasOption(JOYConstants.SHELL_ENV)) { - String envs[] = cliParser.getOptionValues(JOYConstants.SHELL_ENV); - for (String env : envs) { - env = env.trim(); - int index = env.indexOf(JOYConstants.EQUAL); - if (index == -1) { - jstormClientContext.shellEnv.put(env, JOYConstants.EMPTY); - continue; - } - String key = env.substring(0, index); - String val = JOYConstants.EMPTY; - if (index < (env.length() - 1)) { - val = env.substring(index + 1); - } - jstormClientContext.shellEnv.put(key, val); - } - } - jstormClientContext.shellCmdPriority = Integer.parseInt(cliParser.getOptionValue(JOYConstants.SHELL_CMD_PRIORITY, JOYConstants.SHELL_CMD_PRIORITY_DEFAULT_VALUE)); - //set AM memory default to 1000mb - jstormClientContext.containerMemory = Integer.parseInt(cliParser.getOptionValue(JOYConstants.CONTAINER_MEMORY, JOYConstants.DEFAULT_CONTAINER_MEMORY)); - jstormClientContext.containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(JOYConstants.CONTAINER_VCORES, JOYConstants.DEFAULT_CONTAINER_VCORES)); - jstormClientContext.numContainers = Integer.parseInt(cliParser.getOptionValue(JOYConstants.NUM_CONTAINERS, JOYConstants.DEFAULT_NUM_CONTAINER)); - - if (jstormClientContext.containerMemory < 0 || jstormClientContext.containerVirtualCores < 0 || jstormClientContext.numContainers < 1) { - throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified," - + " exiting." - + " Specified containerMemory=" + jstormClientContext.containerMemory - + ", containerVirtualCores=" + jstormClientContext.containerVirtualCores - + ", numContainer=" + jstormClientContext.numContainers); - } - - jstormClientContext.nodeLabelExpression = cliParser.getOptionValue(JOYConstants.NODE_LABEL_EXPRESSION, null); - jstormClientContext.clientTimeout = Integer.parseInt(cliParser.getOptionValue(JOYConstants.TIMEOUT, JOYConstants.DEFAULT_CLIENT_TIME_OUT)); - - jstormClientContext.attemptFailuresValidityInterval = - Long.parseLong(cliParser.getOptionValue( - JOYConstants.ATTEMPT_FAILURES_VALIDITY_INTERVAL, JOYConstants.DEFAULT_ATTEMPT_FAILURES_VALIDITY_INTERVAL)); - - jstormClientContext.log4jPropFile = cliParser.getOptionValue(JOYConstants.LOG_PROPERTIES, JOYConstants.EMPTY); - - // Get timeline domain options - if (cliParser.hasOption(JOYConstants.DOMAIN)) { - jstormClientContext.domainId = cliParser.getOptionValue(JOYConstants.DOMAIN); - jstormClientContext.toCreateDomain = cliParser.hasOption(JOYConstants.CREATE); - if (cliParser.hasOption(JOYConstants.VIEW_ACLS)) { - jstormClientContext.viewACLs = cliParser.getOptionValue(JOYConstants.VIEW_ACLS); - } - if (cliParser.hasOption(JOYConstants.MODIFY_ACLS)) { - jstormClientContext.modifyACLs = cliParser.getOptionValue(JOYConstants.MODIFY_ACLS); - } - } + JstormYarnUtils.checkAndSetOptions(cliParser, jstormClientContext); return true; } @@ -349,7 +207,6 @@ public boolean run() throws IOException, YarnException { LOG.info("Copy App Master jar from local filesystem and add to local environment"); // Copy the application master jar to the filesystem - // Create a local resource to point to the destination jar path FileSystem fs = FileSystem.get(jstormClientContext.conf); addToLocalResources(fs, jstormClientContext.appMasterJar, JOYConstants.appMasterJarPath, appId.toString(), localResources, null); @@ -533,9 +390,7 @@ public boolean run() throws IOException, YarnException { // Set the queue to which this application is to be submitted in the RM appContext.setQueue(jstormClientContext.amQueue); - // Submit the application to the applications manager LOG.info("Submitting application to ASM"); - LOG.info("conf instanceName : " + jstormClientContext.conf.get(JOYConstants.INSTANCE_NAME_KEY)); //check configuration if (JstormYarnUtils.isUnset(jstormClientContext.conf.get(JOYConstants.INSTANCE_NAME_KEY))) { diff --git a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/appmaster/JstormMaster.java b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/appmaster/JstormMaster.java index 58bfb379e..57dea58f8 100644 --- a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/appmaster/JstormMaster.java +++ b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/appmaster/JstormMaster.java @@ -24,6 +24,7 @@ import com.alibaba.jstorm.yarn.model.STARTType; import com.alibaba.jstorm.yarn.registry.SlotPortsView; import com.alibaba.jstorm.yarn.server.AMServer; +import com.alibaba.jstorm.yarn.utils.JstormYarnUtils; import com.alibaba.jstorm.yarn.utils.PortScanner; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; @@ -129,7 +130,7 @@ public static void main(String[] args) { LOG.info("Initializing Jstorm Master!"); boolean doRun = appMaster.init(args); if (!doRun) { - System.exit(0); + System.exit(JOYConstants.EXIT_SUCCESS); } appMaster.run(); // LRS won't finish at all @@ -137,46 +138,17 @@ public static void main(String[] args) { } catch (Throwable t) { LOG.fatal("Error running JstormMaster", t); LogManager.shutdown(); - ExitUtil.terminate(1, t); + ExitUtil.terminate(JOYConstants.EXIT_FAIL1, t); } if (result) { LOG.info("Application Master completed successfully. exiting"); - System.exit(0); + System.exit(JOYConstants.EXIT_SUCCESS); } else { LOG.info("Application Master failed. exiting"); - System.exit(2); + System.exit(JOYConstants.EXIT_FAIL2); } } - /** - * Dump out contents of $CWD and the environment to stdout for debugging - */ - private void dumpOutDebugInfo() { - - LOG.info("Dump debug output"); - Map envs = System.getenv(); - for (Map.Entry env : envs.entrySet()) { - LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); - System.out.println("System env: key=" + env.getKey() + ", val=" - + env.getValue()); - } - - BufferedReader buf = null; - try { - String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") : - Shell.execCommand("ls", "-al"); - buf = new BufferedReader(new StringReader(lines)); - String line = ""; - while ((line = buf.readLine()) != null) { - LOG.info("System CWD content: " + line); - System.out.println("System CWD content: " + line); - } - } catch (IOException e) { - e.printStackTrace(); - } finally { - IOUtils.cleanup(LOG, buf); - } - } public JstormMaster() { // Set up the configuration @@ -207,147 +179,18 @@ public boolean init(String[] args) throws ParseException, IOException { "No. of containers on which the shell command needs to be executed"); opts.addOption(JOYConstants.PRIORITY, true, "Application Priority. Default 0"); opts.addOption(JOYConstants.DEBUG, false, "Dump out debug information"); - opts.addOption(JOYConstants.HELP, false, "Print usage"); - CommandLine cliParser = new GnuParser().parse(opts, args); if (args.length == 0) { printUsage(opts); throw new IllegalArgumentException( "No args specified for application master to initialize"); } - //Check whether customer log4j.properties file exists - if (fileExist(log4jPath)) { - try { - Log4jPropertyHelper.updateLog4jConfiguration(JstormMaster.class, - log4jPath); - } catch (Exception e) { - LOG.warn("Can not set up custom log4j properties. " + e); - } - } - - if (cliParser.hasOption(JOYConstants.DEBUG)) { - dumpOutDebugInfo(); - } - - Map envs = System.getenv(); - - if (!envs.containsKey(Environment.CONTAINER_ID.name())) { - if (cliParser.hasOption(JOYConstants.APP_ATTEMPT_ID)) { - String appIdStr = cliParser.getOptionValue(JOYConstants.APP_ATTEMPT_ID, JOYConstants.EMPTY); - jstormMasterContext.appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr); - } else { - throw new IllegalArgumentException( - "Application Attempt Id not set in the environment"); - } - } else { - ContainerId containerId = ConverterUtils.toContainerId(envs - .get(Environment.CONTAINER_ID.name())); - jstormMasterContext.appAttemptID = containerId.getApplicationAttemptId(); - } - - if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) { - throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV - + " not set in the environment"); - } - if (!envs.containsKey(Environment.NM_HOST.name())) { - throw new RuntimeException(Environment.NM_HOST.name() - + " not set in the environment"); - } - if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) { - throw new RuntimeException(Environment.NM_HTTP_PORT - + " not set in the environment"); - } - if (!envs.containsKey(Environment.NM_PORT.name())) { - throw new RuntimeException(Environment.NM_PORT.name() - + " not set in the environment"); - } - - LOG.info("Application master for app" + ", appId=" - + jstormMasterContext.appAttemptID.getApplicationId().getId() + ", clustertimestamp=" - + jstormMasterContext.appAttemptID.getApplicationId().getClusterTimestamp() - + ", attemptId=" + jstormMasterContext.appAttemptID.getAttemptId()); - - if (!fileExist(shellCommandPath) - && envs.get(JOYConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) { - throw new IllegalArgumentException( - "No shell command or shell script specified to be executed by application master"); - } - if (fileExist(shellCommandPath)) { - jstormMasterContext.shellCommand = readContent(shellCommandPath); - } - - if (fileExist(shellArgsPath)) { - jstormMasterContext.shellArgs = readContent(shellArgsPath); - } - - if (cliParser.hasOption(JOYConstants.SHELL_ENV)) { - String shellEnvs[] = cliParser.getOptionValues(JOYConstants.SHELL_ENV); - for (String env : shellEnvs) { - env = env.trim(); - int index = env.indexOf(JOYConstants.EQUAL); - if (index == -1) { - jstormMasterContext.shellEnv.put(env, JOYConstants.EMPTY); - continue; - } - String key = env.substring(0, index); - String val = JOYConstants.EMPTY; - if (index < (env.length() - 1)) { - val = env.substring(index + 1); - } - jstormMasterContext.shellEnv.put(key, val); - } - } - - if (envs.containsKey(JOYConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) { - jstormMasterContext.scriptPath = envs.get(JOYConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); - - jstormMasterContext.appMasterJarPath = envs.get(JOYConstants.APPMASTERJARSCRIPTLOCATION); - if (envs.containsKey(JOYConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { - jstormMasterContext.shellScriptPathTimestamp = Long.parseLong(envs - .get(JOYConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)); - jstormMasterContext.jarTimestamp = Long.parseLong(envs - .get(JOYConstants.APPMASTERTIMESTAMP)); - } - if (envs.containsKey(JOYConstants.DISTRIBUTEDSHELLSCRIPTLEN)) { - jstormMasterContext.shellScriptPathLen = Long.parseLong(envs - .get(JOYConstants.DISTRIBUTEDSHELLSCRIPTLEN)); - jstormMasterContext.jarPathLen = Long.parseLong(envs - .get(JOYConstants.APPMASTERLEN)); - } - - if (!jstormMasterContext.scriptPath.isEmpty() - && (jstormMasterContext.shellScriptPathTimestamp <= 0 || jstormMasterContext.shellScriptPathLen <= 0)) { - LOG.error("Illegal values in env for shell script path" + ", path=" - + jstormMasterContext.scriptPath + ", len=" + jstormMasterContext.shellScriptPathLen + ", timestamp=" - + jstormMasterContext.shellScriptPathTimestamp); - throw new IllegalArgumentException( - "Illegal values in env for shell script path"); - } - } - - if (envs.containsKey(JOYConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) { - jstormMasterContext.domainId = envs.get(JOYConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN); - } - - if (envs.containsKey(JOYConstants.BINARYFILEDEPLOYPATH) - && !envs.get(JOYConstants.BINARYFILEDEPLOYPATH).equals(JOYConstants.EMPTY)) { - conf.set(JOYConstants.INSTANCE_DEPLOY_DIR_KEY, envs.get(JOYConstants.BINARYFILEDEPLOYPATH)); - jstormMasterContext.deployPath = envs.get(JOYConstants.BINARYFILEDEPLOYPATH); - } - - if (envs.containsKey(JOYConstants.INSTANCENAME) - && !envs.get(JOYConstants.INSTANCENAME).equals(JOYConstants.EMPTY)) { - conf.set(JOYConstants.INSTANCE_NAME_KEY, envs.get(JOYConstants.INSTANCENAME)); - jstormMasterContext.instanceName = envs.get(JOYConstants.INSTANCENAME); - } - jstormMasterContext.containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( - JOYConstants.CONTAINER_VCORES, JOYConstants.DEFAULT_CONTAINER_VCORES)); - jstormMasterContext.numTotalContainers = Integer.parseInt(cliParser.getOptionValue( - JOYConstants.NUM_CONTAINERS, JOYConstants.DEFAULT_NUM_CONTAINER)); - if (jstormMasterContext.numTotalContainers == 0) { - throw new IllegalArgumentException( - "Cannot run distributed shell with no containers"); + try { + CommandLine cliParser = new GnuParser().parse(opts, args); + JstormYarnUtils.checkAndSetMasterOptions(cliParser, jstormMasterContext, this.conf); + } catch (Exception e) { + LOG.error(e); } return true; } @@ -378,7 +221,6 @@ private void buildPortScanner() { @SuppressWarnings({"unchecked"}) public void run() throws Exception { LOG.info("Starting JstormMaster"); - Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); DataOutputBuffer dob = new DataOutputBuffer(); @@ -636,11 +478,9 @@ protected boolean finish() { } // Join all launched threads - // needed for when we time out - // and we need to release containers for (Thread launchThread : launchThreads) { try { - launchThread.join(10000); + launchThread.join(JOYConstants.JOIN_THREAD_TIMEOUT); } catch (InterruptedException e) { LOG.info("Exception thrown in thread join: " + e.getMessage()); e.printStackTrace(); @@ -717,11 +557,9 @@ public void onContainersCompleted(List completedContainers) { // increment counters for completed/failed containers int exitStatus = containerStatus.getExitStatus(); - if (0 != exitStatus) { + if (JOYConstants.EXIT_SUCCESS != exitStatus) { // container failed if (ContainerExitStatus.ABORTED != exitStatus) { - // shell script failed - // counts as completed jstormMasterContext.numCompletedContainers.incrementAndGet(); jstormMasterContext.numFailedContainers.incrementAndGet(); } else { @@ -729,8 +567,6 @@ public void onContainersCompleted(List completedContainers) { // we should re-try as the container was lost for some reason jstormMasterContext.numAllocatedContainers.decrementAndGet(); jstormMasterContext.numRequestedContainers.decrementAndGet(); - // we do not need to release the container as it would be done - // by the RM } if (nimbusMap.containsKey(containerId)) { @@ -841,13 +677,12 @@ public void onContainersAllocated(List allocatedContainers) { // so when nimbus container allocated we register nimbus's host, directory and containerId, pull previous nimbus // data from previous nimbus host if necessary. ServiceRecord serviceRecord = setupServiceRecord(); - jstormMasterContext.previousNimbusHost = ""; + jstormMasterContext.previousNimbusHost = JOYConstants.EMPTY; try { ServiceRecord sr = registryOperations.resolve(path); jstormMasterContext.previousNimbusHost = sr.get(JOYConstants.NIMBUS_HOST, JOYConstants.EMPTY); - LOG.info("previousNimbusHost is :" + jstormMasterContext.previousNimbusHost); - LOG.info("nimbusHost is :" + jstormMasterContext.nimbusHost); + LOG.info("previousNimbusHost is :" + jstormMasterContext.previousNimbusHost + "; nimbusHost is :" + jstormMasterContext.nimbusHost); // nimbus location register, then we can restart nimbus with no work loss serviceRecord.set(JOYConstants.NIMBUS_HOST, jstormMasterContext.nimbusHost); serviceRecord.set(JOYConstants.NIMBUS_LOCAL_DIR, jstormMasterContext.nimbusDataDirPrefix); @@ -1017,10 +852,7 @@ public void run() { // Set the local resources Map localResources = new HashMap(); - // The container for the eventual shell commands needs its own local - // resources too. - // In this scenario, if a shell script is specified, we need to have it - // copied and made available to the container. + // The container for the eventual shell commands needs its own local resources too. if (!jstormMasterContext.scriptPath.isEmpty()) { Path renamedScriptPath; if (Shell.WINDOWS) { @@ -1028,7 +860,6 @@ public void run() { } else { renamedScriptPath = new Path(jstormMasterContext.scriptPath + ".sh"); } - try { // rename the script file based on the underlying OS syntax. renameScriptFile(renamedScriptPath); @@ -1036,8 +867,7 @@ public void run() { LOG.error( "Not able to add suffix (.bat/.sh) to the shell script filename", e); - // We know we cannot continue launching the container - // so we should release it. + // we cannot continue launching the container. so release it. jstormMasterContext.numCompletedContainers.incrementAndGet(); jstormMasterContext.numFailedContainers.incrementAndGet(); return; @@ -1069,7 +899,7 @@ public void run() { FileStatus scriptStatus = fileSystem.getFileStatus(renamedScriptPath); jstormMasterContext.shellScriptPathLen = scriptStatus.getLen(); jstormMasterContext.shellScriptPathTimestamp = scriptStatus.getModificationTime(); - LOG.info("jarPathLen:" + jstormMasterContext.jarPathLen + " jarTimepstamp:" + jstormMasterContext.jarTimestamp); + LOG.info("jar len:" + jstormMasterContext.jarPathLen + " jar timespan:" + jstormMasterContext.jarTimestamp); } catch (IOException e) { LOG.error("get hdfs filestatus" @@ -1084,7 +914,7 @@ public void run() { LocalResource jarRsrc = LocalResource.newInstance(jarUrl, LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, jstormMasterContext.jarPathLen, jstormMasterContext.jarTimestamp); - localResources.put("AppMaster.jar", jarRsrc); + localResources.put(JOYConstants.appMasterJarPath, jarRsrc); LOG.info(shellRsrc.getResource().getFile()); LOG.info(jarRsrc.getResource().getFile()); @@ -1103,11 +933,11 @@ public void run() { : JOYConstants.ExecShellStringPath); } - String startTypeStr = "supervisor"; + String startTypeStr = JOYConstants.SUPERVISOR; // start type specified to be excute by shell script to start jstorm process if (startType == STARTType.NIMBUS) { - startTypeStr = "nimbus"; - vargs.add("nimbus"); + startTypeStr = JOYConstants.NIMBUS; + vargs.add(JOYConstants.NIMBUS); //put containerId in nimbus containers queue try { jstormMasterContext.nimbusContainers.put(this.container); @@ -1115,7 +945,7 @@ public void run() { e.printStackTrace(); } } else { - vargs.add("supervisor"); + vargs.add(JOYConstants.SUPERVISOR); try { jstormMasterContext.supervisorContainers.put(this.container); } catch (InterruptedException e) { @@ -1124,17 +954,18 @@ public void run() { } // pass instanceName for multiple instance deploy - jstormMasterContext.nimbusDataDirPrefix = conf.get("jstorm.yarn.instance.dataDir"); - String localDir = jstormMasterContext.nimbusDataDirPrefix + container.getId().toString() + "/" + jstormMasterContext.instanceName; + jstormMasterContext.nimbusDataDirPrefix = conf.get(JOYConstants.INSTANCE_DATA_DIR_KEY); + String localDir = jstormMasterContext.nimbusDataDirPrefix + container.getId().toString() + JOYConstants.BACKLASH + + jstormMasterContext.instanceName; vargs.add(localDir); vargs.add(jstormMasterContext.deployPath); //get superviorhost's free port SlotPortsView slotPortsView = new SlotPortsView(jstormMasterContext.instanceName, container.getId(), registryOperations); - slotPortsView.setMinPort(conf.getInt("jstorm.yarn.supervisor.minport", JOYConstants.PORT_RANGE_MIN)); - slotPortsView.setMaxPort(conf.getInt("jstorm.yarn.supervisor.maxport", JOYConstants.PORT_RANGE_MAX)); - String slotPortsStr = ""; + slotPortsView.setMinPort(conf.getInt(JOYConstants.SUPERVISOR_MIN_PORT_KEY, JOYConstants.PORT_RANGE_MIN)); + slotPortsView.setMaxPort(conf.getInt(JOYConstants.SUPERVISOR_MAX_PORT_KEY, JOYConstants.PORT_RANGE_MAX)); + String slotPortsStr = JOYConstants.EMPTY; try { slotPortsStr = slotPortsView.getSupervisorSlotPorts(container.getResource().getMemory(), container.getResource().getVirtualCores(), container.getNodeId().getHost()); @@ -1144,25 +975,24 @@ public void run() { LOG.error("failed get slot ports , container " + container.toString() + "launch fail", ex); return; } - String logviewPort = "8622"; - String nimbusThriftPort = "8627"; + String logviewPort = JOYConstants.DEFAULT_LOGVIEW_PORT; + String nimbusThriftPort = JOYConstants.DEFAULT_NIMBUS_THRIFT_PORT; try { - logviewPort = slotPortsView.getSupervisorSlotPorts(4110, - 1, container.getNodeId().getHost()); - nimbusThriftPort = slotPortsView.getSupervisorSlotPorts(4110, - 1, container.getNodeId().getHost()); -// supervisorLogviewPort = slotPortsView.getSetPortUsedBySupervisor(container.getNodeId().getHost(), 1).get(0); + logviewPort = slotPortsView.getSupervisorSlotPorts(JOYConstants.DEFAULT_SUPERVISOR_MEMORY, + JOYConstants.DEFAULT_SUPERVISOR_VCORES, container.getNodeId().getHost()); + nimbusThriftPort = slotPortsView.getSupervisorSlotPorts(JOYConstants.DEFAULT_SUPERVISOR_MEMORY, + JOYConstants.DEFAULT_SUPERVISOR_VCORES, container.getNodeId().getHost()); } catch (Exception e) { e.printStackTrace(); } - String hadoopHome = conf.get("jstorm.yarn.hadoop.home"); - String javaHome = conf.get("jstorm.yarn.java.home"); - String pythonHome = conf.get("jstorm.yarn.python.home"); + String hadoopHome = conf.get(JOYConstants.HADOOP_HOME_KEY); + String javaHome = conf.get(JOYConstants.JAVA_HOME_KEY); + String pythonHome = conf.get(JOYConstants.PYTHON_HOME_KEY); vargs.add(hadoopHome); vargs.add(javaHome);//$6 vargs.add(pythonHome);//$7 - String deployDst = conf.get("jstorm.yarn.instance.deploy.destination"); + String deployDst = conf.get(JOYConstants.INSTANCE_DEPLOY_DEST_KEY); if (deployDst == null) { deployDst = jstormMasterContext.nimbusDataDirPrefix; } @@ -1186,7 +1016,7 @@ public void run() { Map envs = System.getenv(); String exectorCommand = ExecutorLoader.loadCommand(jstormMasterContext.instanceName, jstormMasterContext.shellCommand, startTypeStr, this.container.getId().toString(), localDir, jstormMasterContext.deployPath, hadoopHome, javaHome, pythonHome, dstPath, slotPortsStr, jstormMasterContext.shellArgs, - envs.get("CLASSPATH"), JOYConstants.ExecShellStringPath, jstormMasterContext.appAttemptID.getApplicationId().toString(), logviewPort, nimbusThriftPort); + envs.get(JOYConstants.CLASS_PATH), JOYConstants.ExecShellStringPath, jstormMasterContext.appAttemptID.getApplicationId().toString(), logviewPort, nimbusThriftPort); exectorCommand = exectorCommand + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"; @@ -1251,19 +1081,6 @@ public ContainerRequest setupContainerAskForRM(int containerMemory, int containe return request; } - private boolean fileExist(String filePath) { - return new File(filePath).exists(); - } - - private String readContent(String filePath) throws IOException { - DataInputStream ds = null; - try { - ds = new DataInputStream(new FileInputStream(filePath)); - return ds.readUTF(); - } finally { - org.apache.commons.io.IOUtils.closeQuietly(ds); - } - } private static void publishContainerStartEvent( final TimelineClient timelineClient, Container container, String domainId, @@ -1272,12 +1089,12 @@ private static void publishContainerStartEvent( entity.setEntityId(container.getId().toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString()); entity.setDomainId(domainId); - entity.addPrimaryFilter("user", ugi.getShortUserName()); + entity.addPrimaryFilter(JOYConstants.USER, ugi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType(DSEvent.DS_CONTAINER_START.toString()); - event.addEventInfo("Node", container.getNodeId().toString()); - event.addEventInfo("Resources", container.getResource().toString()); + event.addEventInfo(JOYConstants.NODE, container.getNodeId().toString()); + event.addEventInfo(JOYConstants.RESOURCES, container.getResource().toString()); entity.addEvent(event); try { @@ -1301,12 +1118,12 @@ private static void publishContainerEndEvent( entity.setEntityId(container.getContainerId().toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString()); entity.setDomainId(domainId); - entity.addPrimaryFilter("user", ugi.getShortUserName()); + entity.addPrimaryFilter(JOYConstants.USER, ugi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType(DSEvent.DS_CONTAINER_END.toString()); - event.addEventInfo("State", container.getState().name()); - event.addEventInfo("Exit Status", container.getExitStatus()); + event.addEventInfo(JOYConstants.STATE, container.getState().name()); + event.addEventInfo(JOYConstants.EXIT_STATE, container.getExitStatus()); entity.addEvent(event); try { timelineClient.putEntities(entity); @@ -1326,7 +1143,7 @@ private static void publishApplicationAttemptEvent( entity.setEntityId(appAttemptId); entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); entity.setDomainId(domainId); - entity.addPrimaryFilter("user", ugi.getShortUserName()); + entity.addPrimaryFilter(JOYConstants.USER, ugi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setEventType(appEvent.toString()); event.setTimestamp(System.currentTimeMillis()); @@ -1335,12 +1152,12 @@ private static void publishApplicationAttemptEvent( timelineClient.putEntities(entity); } catch (YarnException e) { LOG.error("App Attempt " - + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") + + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? JOYConstants.START : JOYConstants.END) + " event could not be published for " + appAttemptId.toString(), e); } catch (IOException e) { LOG.error("App Attempt " - + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") + + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? JOYConstants.START : JOYConstants.END) + " event could not be published for " + appAttemptId.toString(), e); } diff --git a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/constants/JOYConstants.java b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/constants/JOYConstants.java index 995716341..a25796d7a 100644 --- a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/constants/JOYConstants.java +++ b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/constants/JOYConstants.java @@ -105,6 +105,8 @@ public class JOYConstants { public static final String RM_ADDRESS_KEY = "yarn.resourcemanager.address"; public static final String INSTANCE_DEPLOY_DIR_KEY = "jstorm.yarn.instance.deploy.dir"; public static final String INSTANCE_NAME_KEY = "jstorm.yarn.instance.name"; + public static final String SUPERVISOR_MIN_PORT_KEY = "jstorm.yarn.supervisor.minport"; + public static final String SUPERVISOR_MAX_PORT_KEY = "jstorm.yarn.supervisor.maxport"; public static final String HADOOP_REGISTRY_ZK_RETRY_INTERVAL_MS = "hadoop.registry.zk.retry.interval.ms"; public static final String INSTANCE_DATA_DIR_KEY = "jstorm.yarn.instance.dataDir"; public static final String JSTORM_YARN_USER = "jstorm.yarn.user"; @@ -119,6 +121,8 @@ public class JOYConstants { public static final String windows_command = "cmd /c"; public static final Integer EXIT_SUCCESS = 0; public static final Integer EXIT_FAIL = -1; + public static final Integer EXIT_FAIL1 = 1; + public static final Integer EXIT_FAIL2 = 2; /** * application master @@ -126,7 +130,23 @@ public class JOYConstants { public static final String NIMBUS_HOST = "nimbus.host"; public static final String NIMBUS_CONTAINER = "nimbus.containerId"; public static final String NIMBUS_LOCAL_DIR = "nimbus.localdir"; + public static final String DEFAULT_LOGVIEW_PORT = "8622"; + public static final String DEFAULT_NIMBUS_THRIFT_PORT = "8627"; + public static final Integer DEFAULT_SUPERVISOR_MEMORY = 4110; + public static final Integer DEFAULT_SUPERVISOR_VCORES = 1; public static final Integer AM_RM_CLIENT_INTERVAL = 1000; + public static final String HADOOP_HOME_KEY = "jstorm.yarn.hadoop.home"; + public static final String JAVA_HOME_KEY = "jstorm.yarn.java.home"; + public static final String PYTHON_HOME_KEY = "jstorm.yarn.python.home"; + public static final String INSTANCE_DEPLOY_DEST_KEY = "jstorm.yarn.instance.deploy.destination"; + public static final String USER = "user"; + public static final String NODE = "Node"; + public static final String RESOURCES = "Resources"; + public static final String STATE = "State"; + public static final String EXIT_STATE = "Exit Status"; + public static final String START = "start"; + public static final String END = "end"; + public static final Integer JOIN_THREAD_TIMEOUT = 10000; /** * port view @@ -136,12 +156,17 @@ public class JOYConstants { public static final String PORT = "port"; public static final String AM = "am"; public static final String CTIME = "cTime"; + public static final String DEFAULT_CTIME = "0"; public static final String PORT_LIST = "portList"; public static final String CONTAINER = "container"; public static final String PORT_RANGE = "9111-9999"; public static final String HTTP = "http"; public static final String HOST_PORT = "host/port"; public static final String RPC = "rpc"; + public static final Integer SLEEP_INTERVAL = 1000; + public static final Integer RETRY_TIMES = 45; + public static final Double JSTORM_MEMORY_WEIGHT = 4096.0; + public static final Double JSTORM_VCORE_WEIGHT = 1.2; /** * cli options diff --git a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/context/JstormMasterContext.java b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/context/JstormMasterContext.java index f610f0db7..5f7caa3bd 100644 --- a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/context/JstormMasterContext.java +++ b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/context/JstormMasterContext.java @@ -189,4 +189,9 @@ public BlockingQueue getContainerRequest() { public Set getUpgradingContainerIds() { return null; } + + @Override + public Map getShellEnv() { + return shellEnv; + } } diff --git a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/context/MasterContext.java b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/context/MasterContext.java index 03495f6e0..8b95c062a 100644 --- a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/context/MasterContext.java +++ b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/context/MasterContext.java @@ -8,6 +8,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.client.api.AMRMClient; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -43,4 +45,6 @@ public interface MasterContext { Set getUpgradingContainerIds(); + Map getShellEnv(); + } diff --git a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/registry/SlotPortsView.java b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/registry/SlotPortsView.java index b06cbd716..1fefe02f6 100644 --- a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/registry/SlotPortsView.java +++ b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/registry/SlotPortsView.java @@ -45,7 +45,7 @@ public SlotPortsView(String instanceName, ContainerId containerId, RegistryOpera public List getSetPortUsedBySupervisor(String supervisorHost, int slotCount) throws Exception { String appPath = RegistryUtils.serviceclassPath( - JOYConstants.APP_TYPE, ""); + JOYConstants.APP_TYPE, JOYConstants.EMPTY); String path = RegistryUtils.serviceclassPath( JOYConstants.APP_TYPE, instanceName); @@ -125,7 +125,6 @@ public List getSetPortUsedBySupervisor(String supervisorHost, int slotCo sr.description = JOYConstants.CONTAINER; sr.set(YarnRegistryAttributes.YARN_PERSISTENCE, PersistencePolicies.CONTAINER); - registryOperations.bind(containerPath, sr, BindFlags.OVERWRITE); } return reList; @@ -147,9 +146,9 @@ public String getSupervisorSlotPorts(int memory, int vcores, String supervisorHo LOG.info("slotCount:" + slotCount); relist = getSetPortUsedBySupervisor(supervisorHost, slotCount); - LOG.info("get ports string:" + JstormYarnUtils.join(relist, ",", false)); + LOG.info("get ports string:" + JstormYarnUtils.join(relist, JOYConstants.COMMA, false)); - return JstormYarnUtils.join(relist, ",", false); + return JstormYarnUtils.join(relist, JOYConstants.COMMA, false); } catch (Exception e) { LOG.error(e); throw e; @@ -160,10 +159,9 @@ public String getSupervisorSlotPorts(int memory, int vcores, String supervisorHo //todo: cause we don't support cgroup yet,now vcores is useless private int getSlotCount(int memory, int vcores) { - int cpuports = (int) Math.ceil(vcores / 1.2); - int memoryports = (int) Math.floor(memory / 4096.0); + int cpuports = (int) Math.ceil(vcores / JOYConstants.JSTORM_VCORE_WEIGHT); + int memoryports = (int) Math.floor(memory / JOYConstants.JSTORM_MEMORY_WEIGHT); // return cpuports > memoryports ? memoryports : cpuports; - // doesn't support cgroup yet return memoryports; } @@ -177,12 +175,11 @@ private int getSlotCount(int memory, int vcores) { */ private void tryHostLock(String hostPath) throws Exception { - //if path has created 60 seconds ago, then delete if (registryOperations.exists(hostPath)) { try { ServiceRecord host = registryOperations.resolve(hostPath); - Long cTime = Long.parseLong(host.get(JOYConstants.CTIME, "0")); + Long cTime = Long.parseLong(host.get(JOYConstants.CTIME, JOYConstants.DEFAULT_CTIME)); Date now = new Date(); if (now.getTime() - cTime > JOYConstants.HOST_LOCK_TIMEOUT || cTime > now.getTime()) registryOperations.delete(hostPath, true); @@ -191,9 +188,9 @@ private void tryHostLock(String hostPath) throws Exception { } } - int failedCount = 45; + int failedCount = JOYConstants.RETRY_TIMES; while (!registryOperations.mknode(hostPath, true)) { - Thread.sleep(1000); + Thread.sleep(JOYConstants.SLEEP_INTERVAL); failedCount--; if (failedCount <= 0) break; diff --git a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/utils/JstormYarnUtils.java b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/utils/JstormYarnUtils.java index 4a961b7f0..ce8f09f3f 100644 --- a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/utils/JstormYarnUtils.java +++ b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/utils/JstormYarnUtils.java @@ -1,24 +1,42 @@ package com.alibaba.jstorm.yarn.utils; +import com.alibaba.jstorm.yarn.JstormOnYarn; +import com.alibaba.jstorm.yarn.Log4jPropertyHelper; +import com.alibaba.jstorm.yarn.appmaster.JstormMaster; import com.alibaba.jstorm.yarn.constants.JOYConstants; import com.alibaba.jstorm.yarn.constants.JstormXmlConfKeys; +import com.alibaba.jstorm.yarn.context.JstormClientContext; +import com.alibaba.jstorm.yarn.context.JstormMasterContext; +import com.alibaba.jstorm.yarn.context.MasterContext; +import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.registry.client.api.BindFlags; import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.ConverterUtils; import java.io.*; import java.net.ServerSocket; import java.net.Socket; import java.util.*; +import static com.alibaba.jstorm.yarn.constants.JOYConstants.log4jPath; +import static com.alibaba.jstorm.yarn.constants.JOYConstants.shellArgsPath; +import static com.alibaba.jstorm.yarn.constants.JOYConstants.shellCommandPath; + /** * Created by fengjian on 15/12/29. */ @@ -399,4 +417,302 @@ public static void getYarnConfFromJar(String jarPath) { "No configuration file specified to be executed by application master to launch process"); } } + + public static void checkAndSetOptions(CommandLine cliParser, JstormClientContext jstormClientContext) { + if (cliParser.hasOption(JOYConstants.DEBUG)) { + jstormClientContext.debugFlag = true; + } + if (cliParser.hasOption(JOYConstants.KEEP_CONTAINERS_ACROSS_APPLICATION_ATTEMPTS)) { + jstormClientContext.keepContainers = true; + } + if (cliParser.hasOption(JOYConstants.LOG_PROPERTIES)) { + String log4jPath = cliParser.getOptionValue(JOYConstants.LOG_PROPERTIES); + try { + Log4jPropertyHelper.updateLog4jConfiguration(JstormOnYarn.class, log4jPath); + } catch (Exception e) { + LOG.warn("Can not set up custom log4j properties. " + e); + } + } + if (jstormClientContext.amMemory < 0) { + throw new IllegalArgumentException("Invalid memory specified for application master, exiting." + + " Specified memory=" + jstormClientContext.amMemory); + } + if (jstormClientContext.amVCores < 0) { + throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting." + + " Specified virtual cores=" + jstormClientContext.amVCores); + } + if (!cliParser.hasOption(JOYConstants.JAR)) { + throw new IllegalArgumentException("No jar file specified for application master"); + } + if (!jstormClientContext.rmHost.equals(JOYConstants.EMPTY)) { + jstormClientContext.conf.set(JOYConstants.RM_ADDRESS_KEY, jstormClientContext.rmHost, JOYConstants.YARN_CONF_MODE); + } + if (!jstormClientContext.nameNodeHost.equals(JOYConstants.EMPTY)) { + jstormClientContext.conf.set(JOYConstants.FS_DEFAULTFS_KEY, jstormClientContext.nameNodeHost); + } + if (!StringUtils.isBlank(jstormClientContext.hadoopConfDir)) { + try { + Collection files = FileUtils.listFiles(new File(jstormClientContext.hadoopConfDir), new String[]{JOYConstants.XML}, true); + for (File file : files) { + LOG.info("adding hadoop conf file to conf: " + file.getAbsolutePath()); + jstormClientContext.conf.addResource(file.getAbsolutePath()); + } + } catch (Exception ex) { + LOG.error("failed to list hadoop conf dir: " + jstormClientContext.hadoopConfDir); + } + } + String jarPath = JstormOnYarn.class.getProtectionDomain() + .getCodeSource().getLocation().getPath(); + if (jstormClientContext.confFile == null) { + JstormYarnUtils.getYarnConfFromJar(jarPath); + jstormClientContext.conf.addResource(JOYConstants.CONF_NAME); + } else { + Path jstormyarnConfPath = new Path(jstormClientContext.confFile); + jstormClientContext.conf.addResource(jstormyarnConfPath); + } + if (!cliParser.hasOption(JOYConstants.SHELL_SCRIPT)) { + String jarShellScriptPath = jarPath + JOYConstants.START_JSTORM_SHELL; + try { + InputStream stream = new FileInputStream(jarShellScriptPath); + FileOutputStream out = new FileOutputStream(JOYConstants.START_JSTORM_SHELL); + out.write(IOUtils.toByteArray(stream)); + out.close(); + jstormClientContext.shellScriptPath = JOYConstants.START_JSTORM_SHELL; + } catch (Exception e) { + throw new IllegalArgumentException( + "No shell script specified to be executed by application master to start nimbus and supervisor"); + } + } else if (cliParser.hasOption(JOYConstants.SHELL_COMMAND) && cliParser.hasOption(JOYConstants.SHELL_SCRIPT)) { + throw new IllegalArgumentException("Can not specify shell_command option " + + "and shell_script option at the same time"); + } else if (cliParser.hasOption(JOYConstants.SHELL_COMMAND)) { + jstormClientContext.shellCommand = cliParser.getOptionValue(JOYConstants.SHELL_COMMAND); + } else { + jstormClientContext.shellScriptPath = cliParser.getOptionValue(JOYConstants.SHELL_SCRIPT); + } + if (cliParser.hasOption(JOYConstants.SHELL_ARGS)) { + jstormClientContext.shellArgs = cliParser.getOptionValues(JOYConstants.SHELL_ARGS); + } + setShellEnv(cliParser, jstormClientContext); + jstormClientContext.shellCmdPriority = Integer.parseInt(cliParser.getOptionValue(JOYConstants.SHELL_CMD_PRIORITY, JOYConstants.SHELL_CMD_PRIORITY_DEFAULT_VALUE)); + //set AM memory default to 1000mb + jstormClientContext.containerMemory = Integer.parseInt(cliParser.getOptionValue(JOYConstants.CONTAINER_MEMORY, JOYConstants.DEFAULT_CONTAINER_MEMORY)); + jstormClientContext.containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(JOYConstants.CONTAINER_VCORES, JOYConstants.DEFAULT_CONTAINER_VCORES)); + jstormClientContext.numContainers = Integer.parseInt(cliParser.getOptionValue(JOYConstants.NUM_CONTAINERS, JOYConstants.DEFAULT_NUM_CONTAINER)); + + if (jstormClientContext.containerMemory < 0 || jstormClientContext.containerVirtualCores < 0 || jstormClientContext.numContainers < 1) { + throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified," + + " exiting." + + " Specified containerMemory=" + jstormClientContext.containerMemory + + ", containerVirtualCores=" + jstormClientContext.containerVirtualCores + + ", numContainer=" + jstormClientContext.numContainers); + } + + jstormClientContext.nodeLabelExpression = cliParser.getOptionValue(JOYConstants.NODE_LABEL_EXPRESSION, null); + jstormClientContext.clientTimeout = Integer.parseInt(cliParser.getOptionValue(JOYConstants.TIMEOUT, JOYConstants.DEFAULT_CLIENT_TIME_OUT)); + + jstormClientContext.attemptFailuresValidityInterval = + Long.parseLong(cliParser.getOptionValue( + JOYConstants.ATTEMPT_FAILURES_VALIDITY_INTERVAL, JOYConstants.DEFAULT_ATTEMPT_FAILURES_VALIDITY_INTERVAL)); + + jstormClientContext.log4jPropFile = cliParser.getOptionValue(JOYConstants.LOG_PROPERTIES, JOYConstants.EMPTY); + + // Get timeline domain options + if (cliParser.hasOption(JOYConstants.DOMAIN)) { + jstormClientContext.domainId = cliParser.getOptionValue(JOYConstants.DOMAIN); + jstormClientContext.toCreateDomain = cliParser.hasOption(JOYConstants.CREATE); + if (cliParser.hasOption(JOYConstants.VIEW_ACLS)) { + jstormClientContext.viewACLs = cliParser.getOptionValue(JOYConstants.VIEW_ACLS); + } + if (cliParser.hasOption(JOYConstants.MODIFY_ACLS)) { + jstormClientContext.modifyACLs = cliParser.getOptionValue(JOYConstants.MODIFY_ACLS); + } + } + } + + public static void setShellEnv(CommandLine cliParser, MasterContext jstormContext) { + if (cliParser.hasOption(JOYConstants.SHELL_ENV)) { + String envs[] = cliParser.getOptionValues(JOYConstants.SHELL_ENV); + for (String env : envs) { + env = env.trim(); + int index = env.indexOf(JOYConstants.EQUAL); + if (index == -1) { + jstormContext.getShellEnv().put(env, JOYConstants.EMPTY); + continue; + } + String key = env.substring(0, index); + String val = JOYConstants.EMPTY; + if (index < (env.length() - 1)) { + val = env.substring(index + 1); + } + jstormContext.getShellEnv().put(key, val); + } + } + } + + private static boolean fileExist(String filePath) { + return new File(filePath).exists(); + } + + /** + * Dump out contents of $CWD and the environment to stdout for debugging + */ + private static void dumpOutDebugInfo() { + + LOG.info("Dump debug output"); + Map envs = System.getenv(); + for (Map.Entry env : envs.entrySet()) { + LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); + System.out.println("System env: key=" + env.getKey() + ", val=" + + env.getValue()); + } + + BufferedReader buf = null; + try { + String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") : + Shell.execCommand("ls", "-al"); + buf = new BufferedReader(new StringReader(lines)); + String line = ""; + while ((line = buf.readLine()) != null) { + LOG.info("System CWD content: " + line); + System.out.println("System CWD content: " + line); + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + org.apache.hadoop.io.IOUtils.cleanup(LOG, buf); + } + } + + private static String readContent(String filePath) throws IOException { + DataInputStream ds = null; + try { + ds = new DataInputStream(new FileInputStream(filePath)); + return ds.readUTF(); + } finally { + org.apache.commons.io.IOUtils.closeQuietly(ds); + } + } + + public static void checkAndSetMasterOptions(CommandLine cliParser, JstormMasterContext jstormMasterContext, Configuration conf) throws Exception { + + //Check whether customer log4j.properties file exists + if (fileExist(log4jPath)) { + try { + Log4jPropertyHelper.updateLog4jConfiguration(JstormMaster.class, + log4jPath); + } catch (Exception e) { + LOG.warn("Can not set up custom log4j properties. " + e); + } + } + + if (cliParser.hasOption(JOYConstants.DEBUG)) { + dumpOutDebugInfo(); + } + + Map envs = System.getenv(); + + if (!envs.containsKey(ApplicationConstants.Environment.CONTAINER_ID.name())) { + if (cliParser.hasOption(JOYConstants.APP_ATTEMPT_ID)) { + String appIdStr = cliParser.getOptionValue(JOYConstants.APP_ATTEMPT_ID, JOYConstants.EMPTY); + jstormMasterContext.appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr); + } else { + throw new IllegalArgumentException( + "Application Attempt Id not set in the environment"); + } + } else { + ContainerId containerId = ConverterUtils.toContainerId(envs + .get(ApplicationConstants.Environment.CONTAINER_ID.name())); + jstormMasterContext.appAttemptID = containerId.getApplicationAttemptId(); + } + + if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) { + throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV + + " not set in the environment"); + } + if (!envs.containsKey(ApplicationConstants.Environment.NM_HOST.name())) { + throw new RuntimeException(ApplicationConstants.Environment.NM_HOST.name() + + " not set in the environment"); + } + if (!envs.containsKey(ApplicationConstants.Environment.NM_HTTP_PORT.name())) { + throw new RuntimeException(ApplicationConstants.Environment.NM_HTTP_PORT + + " not set in the environment"); + } + if (!envs.containsKey(ApplicationConstants.Environment.NM_PORT.name())) { + throw new RuntimeException(ApplicationConstants.Environment.NM_PORT.name() + + " not set in the environment"); + } + + LOG.info("Application master for app" + ", appId=" + + jstormMasterContext.appAttemptID.getApplicationId().getId() + ", clustertimestamp=" + + jstormMasterContext.appAttemptID.getApplicationId().getClusterTimestamp() + + ", attemptId=" + jstormMasterContext.appAttemptID.getAttemptId()); + + if (!fileExist(shellCommandPath) + && envs.get(JOYConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) { + throw new IllegalArgumentException( + "No shell command or shell script specified to be executed by application master"); + } + + if (fileExist(shellCommandPath)) { + jstormMasterContext.shellCommand = readContent(shellCommandPath); + } + + if (fileExist(shellArgsPath)) { + jstormMasterContext.shellArgs = readContent(shellArgsPath); + } + + JstormYarnUtils.setShellEnv(cliParser, jstormMasterContext); + + if (envs.containsKey(JOYConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) { + jstormMasterContext.scriptPath = envs.get(JOYConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); + + jstormMasterContext.appMasterJarPath = envs.get(JOYConstants.APPMASTERJARSCRIPTLOCATION); + if (envs.containsKey(JOYConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { + jstormMasterContext.shellScriptPathTimestamp = Long.parseLong(envs + .get(JOYConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)); + jstormMasterContext.jarTimestamp = Long.parseLong(envs + .get(JOYConstants.APPMASTERTIMESTAMP)); + } + if (envs.containsKey(JOYConstants.DISTRIBUTEDSHELLSCRIPTLEN)) { + jstormMasterContext.shellScriptPathLen = Long.parseLong(envs + .get(JOYConstants.DISTRIBUTEDSHELLSCRIPTLEN)); + jstormMasterContext.jarPathLen = Long.parseLong(envs + .get(JOYConstants.APPMASTERLEN)); + } + + if (!jstormMasterContext.scriptPath.isEmpty() + && (jstormMasterContext.shellScriptPathTimestamp <= 0 || jstormMasterContext.shellScriptPathLen <= 0)) { + LOG.error("Illegal values in env for shell script path" + ", path=" + + jstormMasterContext.scriptPath + ", len=" + jstormMasterContext.shellScriptPathLen + ", timestamp=" + + jstormMasterContext.shellScriptPathTimestamp); + throw new IllegalArgumentException( + "Illegal values in env for shell script path"); + } + } + + if (envs.containsKey(JOYConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) { + jstormMasterContext.domainId = envs.get(JOYConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN); + } + + if (envs.containsKey(JOYConstants.BINARYFILEDEPLOYPATH) + && !envs.get(JOYConstants.BINARYFILEDEPLOYPATH).equals(JOYConstants.EMPTY)) { + conf.set(JOYConstants.INSTANCE_DEPLOY_DIR_KEY, envs.get(JOYConstants.BINARYFILEDEPLOYPATH)); + jstormMasterContext.deployPath = envs.get(JOYConstants.BINARYFILEDEPLOYPATH); + } + + if (envs.containsKey(JOYConstants.INSTANCENAME) + && !envs.get(JOYConstants.INSTANCENAME).equals(JOYConstants.EMPTY)) { + conf.set(JOYConstants.INSTANCE_NAME_KEY, envs.get(JOYConstants.INSTANCENAME)); + jstormMasterContext.instanceName = envs.get(JOYConstants.INSTANCENAME); + } + jstormMasterContext.containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( + JOYConstants.CONTAINER_VCORES, JOYConstants.DEFAULT_CONTAINER_VCORES)); + jstormMasterContext.numTotalContainers = Integer.parseInt(cliParser.getOptionValue( + JOYConstants.NUM_CONTAINERS, JOYConstants.DEFAULT_NUM_CONTAINER)); + if (jstormMasterContext.numTotalContainers == 0) { + throw new IllegalArgumentException( + "Cannot run distributed shell with no containers"); + } + } }