diff options
Diffstat (limited to 'third_party/hadoop-0.20.0/contrib/hod/hodlib/Hod/hod.py')
-rw-r--r-- | third_party/hadoop-0.20.0/contrib/hod/hodlib/Hod/hod.py | 754 |
1 files changed, 0 insertions, 754 deletions
diff --git a/third_party/hadoop-0.20.0/contrib/hod/hodlib/Hod/hod.py b/third_party/hadoop-0.20.0/contrib/hod/hodlib/Hod/hod.py deleted file mode 100644 index b2587bb77a..0000000000 --- a/third_party/hadoop-0.20.0/contrib/hod/hodlib/Hod/hod.py +++ /dev/null @@ -1,754 +0,0 @@ -#Licensed to the Apache Software Foundation (ASF) under one -#or more contributor license agreements. See the NOTICE file -#distributed with this work for additional information -#regarding copyright ownership. The ASF licenses this file -#to you under the Apache License, Version 2.0 (the -#"License"); you may not use this file except in compliance -#with the License. You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -#Unless required by applicable law or agreed to in writing, software -#distributed under the License is distributed on an "AS IS" BASIS, -#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -#See the License for the specific language governing permissions and -#limitations under the License. -# -*- python -*- - -import sys, os, getpass, pprint, re, cPickle, random, shutil, time, errno - -import hodlib.Common.logger - -from hodlib.ServiceRegistry.serviceRegistry import svcrgy -from hodlib.Common.xmlrpc import hodXRClient -from hodlib.Common.util import to_http_url, get_exception_string -from hodlib.Common.util import get_exception_error_string -from hodlib.Common.util import hodInterrupt, HodInterruptException -from hodlib.Common.util import HOD_INTERRUPTED_CODE - -from hodlib.Common.nodepoolutil import NodePoolUtil -from hodlib.Hod.hadoop import hadoopCluster, hadoopScript - -CLUSTER_DATA_FILE = 'clusters' -INVALID_STATE_FILE_MSGS = \ - [ - - "Requested operation cannot be performed. Cannot read %s: " + \ - "Permission denied.", - - "Requested operation cannot be performed. " + \ - "Cannot write to %s: Permission denied.", - - "Requested operation cannot be performed. " + \ - "Cannot read/write to %s: Permission denied.", - - "Cannot update %s: Permission denied. " + \ - "Cluster is deallocated, but info and list " + \ - "operations might show incorrect information.", - - ] - -class hodState: - def __init__(self, store): - self.__store = store - self.__stateFile = None - self.__init_store() - self.__STORE_EXT = ".state" - - def __init_store(self): - if not os.path.exists(self.__store): - os.mkdir(self.__store) - - def __set_state_file(self, id=None): - if id: - self.__stateFile = os.path.join(self.__store, "%s%s" % (id, - self.__STORE_EXT)) - else: - for item in os.listdir(self.__store): - if item.endswith(self.__STORE_EXT): - self.__stateFile = os.path.join(self.__store, item) - - def get_state_file(self): - return self.__stateFile - - def checkStateFile(self, id=None, modes=(os.R_OK,)): - # is state file exists/readable/writable/both? - self.__set_state_file(id) - - # return true if file doesn't exist, because HOD CAN create - # state file and so WILL have permissions to read and/or write - try: - os.stat(self.__stateFile) - except OSError, err: - if err.errno == errno.ENOENT: # error 2 (no such file) - return True - - # file exists - ret = True - for mode in modes: - ret = ret and os.access(self.__stateFile, mode) - return ret - - def read(self, id=None): - info = {} - - self.__set_state_file(id) - - if self.__stateFile: - if os.path.isfile(self.__stateFile): - stateFile = open(self.__stateFile, 'r') - try: - info = cPickle.load(stateFile) - except EOFError: - pass - - stateFile.close() - - return info - - def write(self, id, info): - self.__set_state_file(id) - if not os.path.exists(self.__stateFile): - self.clear(id) - - stateFile = open(self.__stateFile, 'w') - cPickle.dump(info, stateFile) - stateFile.close() - - def clear(self, id=None): - self.__set_state_file(id) - if self.__stateFile and os.path.exists(self.__stateFile): - os.remove(self.__stateFile) - else: - for item in os.listdir(self.__store): - if item.endswith(self.__STORE_EXT): - os.remove(item) - -class hodRunner: - - def __init__(self, cfg, log=None, cluster=None): - self.__hodhelp = hodHelp() - self.__ops = self.__hodhelp.ops - self.__cfg = cfg - self.__npd = self.__cfg['nodepooldesc'] - self.__opCode = 0 - self.__user = getpass.getuser() - self.__registry = None - self.__baseLogger = None - # Allowing to pass in log object to help testing - a stub can be passed in - if log is None: - self.__setup_logger() - else: - self.__log = log - - self.__userState = hodState(self.__cfg['hod']['user_state']) - - self.__clusterState = None - self.__clusterStateInfo = { 'env' : None, 'hdfs' : None, 'mapred' : None } - - # Allowing to pass in log object to help testing - a stib can be passed in - if cluster is None: - self.__cluster = hadoopCluster(self.__cfg, self.__log) - else: - self.__cluster = cluster - - def __setup_logger(self): - self.__baseLogger = hodlib.Common.logger.hodLog('hod') - self.__log = self.__baseLogger.add_logger(self.__user ) - - if self.__cfg['hod']['stream']: - self.__baseLogger.add_stream(level=self.__cfg['hod']['debug'], - addToLoggerNames=(self.__user ,)) - - if self.__cfg['hod'].has_key('syslog-address'): - self.__baseLogger.add_syslog(self.__cfg['hod']['syslog-address'], - level=self.__cfg['hod']['debug'], - addToLoggerNames=(self.__user ,)) - - def get_logger(self): - return self.__log - - def __setup_cluster_logger(self, directory): - self.__baseLogger.add_file(logDirectory=directory, level=4, - backupCount=self.__cfg['hod']['log-rollover-count'], - addToLoggerNames=(self.__user ,)) - - def __setup_cluster_state(self, directory): - self.__clusterState = hodState(directory) - - def __norm_cluster_dir(self, directory): - directory = os.path.expanduser(directory) - if not os.path.isabs(directory): - directory = os.path.join(self.__cfg['hod']['original-dir'], directory) - directory = os.path.abspath(directory) - - return directory - - def __setup_service_registry(self): - cfg = self.__cfg['hod'].copy() - cfg['debug'] = 0 - self.__registry = svcrgy(cfg, self.__log) - self.__registry.start() - self.__log.debug(self.__registry.getXMLRPCAddr()) - self.__cfg['hod']['xrs-address'] = self.__registry.getXMLRPCAddr() - self.__cfg['ringmaster']['svcrgy-addr'] = self.__cfg['hod']['xrs-address'] - - def __set_cluster_state_info(self, env, hdfs, mapred, ring, jobid, min, max): - self.__clusterStateInfo['env'] = env - self.__clusterStateInfo['hdfs'] = "http://%s" % hdfs - self.__clusterStateInfo['mapred'] = "http://%s" % mapred - self.__clusterStateInfo['ring'] = ring - self.__clusterStateInfo['jobid'] = jobid - self.__clusterStateInfo['min'] = min - self.__clusterStateInfo['max'] = max - - def __set_user_state_info(self, info): - userState = self.__userState.read(CLUSTER_DATA_FILE) - for key in info.keys(): - userState[key] = info[key] - - self.__userState.write(CLUSTER_DATA_FILE, userState) - - def __remove_cluster(self, clusterDir): - clusterInfo = self.__userState.read(CLUSTER_DATA_FILE) - if clusterDir in clusterInfo: - del(clusterInfo[clusterDir]) - self.__userState.write(CLUSTER_DATA_FILE, clusterInfo) - - def __cleanup(self): - if self.__registry: self.__registry.stop() - - def __check_operation(self, operation): - opList = operation.split() - - if not opList[0] in self.__ops: - self.__log.critical("Invalid hod operation specified: %s" % operation) - self._op_help(None) - self.__opCode = 2 - - return opList - - def __adjustMasterFailureCountConfig(self, nodeCount): - # This method adjusts the ringmaster.max-master-failures variable - # to a value that is bounded by the a function of the number of - # nodes. - - maxFailures = self.__cfg['ringmaster']['max-master-failures'] - # Count number of masters required - depends on which services - # are external - masters = 0 - if not self.__cfg['gridservice-hdfs']['external']: - masters += 1 - if not self.__cfg['gridservice-mapred']['external']: - masters += 1 - - # So, if there are n nodes and m masters, we look atleast for - # all masters to come up. Therefore, atleast m nodes should be - # good, which means a maximum of n-m master nodes can fail. - maxFailedNodes = nodeCount - masters - - # The configured max number of failures is now bounded by this - # number. - self.__cfg['ringmaster']['max-master-failures'] = \ - min(maxFailures, maxFailedNodes) - - def _op_allocate(self, args): - operation = "allocate" - argLength = len(args) - min = 0 - max = 0 - errorFlag = False - errorMsgs = [] - - if argLength == 3: - nodes = args[2] - clusterDir = self.__norm_cluster_dir(args[1]) - - if not os.path.exists(clusterDir): - try: - os.makedirs(clusterDir) - except OSError, err: - errorFlag = True - errorMsgs.append("Could not create cluster directory. %s" \ - % (str(err))) - elif not os.path.isdir(clusterDir): - errorFlag = True - errorMsgs.append( \ - "Invalid cluster directory (--hod.clusterdir or -d) : " + \ - clusterDir + " : Not a directory") - - if int(nodes) < 3 : - errorFlag = True - errorMsgs.append("Invalid nodecount (--hod.nodecount or -n) : " + \ - "Must be >= 3. Given nodes: %s" % nodes) - if errorFlag: - for msg in errorMsgs: - self.__log.critical(msg) - self.__opCode = 3 - return - - if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, \ - (os.R_OK, os.W_OK)): - self.__log.critical(INVALID_STATE_FILE_MSGS[2] % \ - self.__userState.get_state_file()) - self.__opCode = 1 - return - - clusterList = self.__userState.read(CLUSTER_DATA_FILE) - if clusterDir in clusterList.keys(): - self.__setup_cluster_state(clusterDir) - clusterInfo = self.__clusterState.read() - # Check if the job is not running. Only then can we safely - # allocate another cluster. Otherwise the user would need - # to deallocate and free up resources himself. - if clusterInfo.has_key('jobid') and \ - self.__cluster.is_cluster_deallocated(clusterInfo['jobid']): - self.__log.warn("Found a dead cluster at cluster directory '%s'. Deallocating it to allocate a new one." % (clusterDir)) - self.__remove_cluster(clusterDir) - self.__clusterState.clear() - else: - self.__log.critical("Found a previously allocated cluster at cluster directory '%s'. HOD cannot determine if this cluster can be automatically deallocated. Deallocate the cluster if it is unused." % (clusterDir)) - self.__opCode = 12 - return - - self.__setup_cluster_logger(clusterDir) - - (status, message) = self.__cluster.is_valid_account() - if status is not 0: - if message: - for line in message: - self.__log.critical("verify-account output: %s" % line) - self.__log.critical("Cluster cannot be allocated because account verification failed. " \ - + "verify-account returned exit code: %s." % status) - self.__opCode = 4 - return - else: - self.__log.debug("verify-account returned zero exit code.") - if message: - self.__log.debug("verify-account output: %s" % message) - - if re.match('\d+-\d+', nodes): - (min, max) = nodes.split("-") - min = int(min) - max = int(max) - else: - try: - nodes = int(nodes) - min = nodes - max = nodes - except ValueError: - print self.__hodhelp.help(operation) - self.__log.critical( - "%s operation requires a pos_int value for n(nodecount)." % - operation) - self.__opCode = 3 - else: - self.__setup_cluster_state(clusterDir) - clusterInfo = self.__clusterState.read() - self.__opCode = self.__cluster.check_cluster(clusterInfo) - if self.__opCode == 0 or self.__opCode == 15: - self.__setup_service_registry() - if hodInterrupt.isSet(): - self.__cleanup() - raise HodInterruptException() - self.__log.debug("Service Registry started.") - - self.__adjustMasterFailureCountConfig(nodes) - - try: - allocateStatus = self.__cluster.allocate(clusterDir, min, max) - except HodInterruptException, h: - self.__cleanup() - raise h - # Allocation has gone through. - # Don't care about interrupts any more - - try: - if allocateStatus == 0: - self.__set_cluster_state_info(os.environ, - self.__cluster.hdfsInfo, - self.__cluster.mapredInfo, - self.__cluster.ringmasterXRS, - self.__cluster.jobId, - min, max) - self.__setup_cluster_state(clusterDir) - self.__clusterState.write(self.__cluster.jobId, - self.__clusterStateInfo) - # Do we need to check for interrupts here ?? - - self.__set_user_state_info( - { clusterDir : self.__cluster.jobId, } ) - self.__opCode = allocateStatus - except Exception, e: - # Some unknown problem. - self.__cleanup() - self.__cluster.deallocate(clusterDir, self.__clusterStateInfo) - self.__opCode = 1 - raise Exception(e) - elif self.__opCode == 12: - self.__log.critical("Cluster %s already allocated." % clusterDir) - elif self.__opCode == 10: - self.__log.critical("dead\t%s\t%s" % (clusterInfo['jobid'], - clusterDir)) - elif self.__opCode == 13: - self.__log.warn("hdfs dead\t%s\t%s" % (clusterInfo['jobid'], - clusterDir)) - elif self.__opCode == 14: - self.__log.warn("mapred dead\t%s\t%s" % (clusterInfo['jobid'], - clusterDir)) - - if self.__opCode > 0 and self.__opCode != 15: - self.__log.critical("Cannot allocate cluster %s" % clusterDir) - else: - print self.__hodhelp.help(operation) - self.__log.critical("%s operation requires two arguments. " % operation - + "A cluster directory and a nodecount.") - self.__opCode = 3 - - def _is_cluster_allocated(self, clusterDir): - if os.path.isdir(clusterDir): - self.__setup_cluster_state(clusterDir) - clusterInfo = self.__clusterState.read() - if clusterInfo != {}: - return True - return False - - def _op_deallocate(self, args): - operation = "deallocate" - argLength = len(args) - if argLength == 2: - clusterDir = self.__norm_cluster_dir(args[1]) - if os.path.isdir(clusterDir): - self.__setup_cluster_state(clusterDir) - clusterInfo = self.__clusterState.read() - if clusterInfo == {}: - self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True) - else: - self.__opCode = \ - self.__cluster.deallocate(clusterDir, clusterInfo) - # irrespective of whether deallocate failed or not\ - # remove the cluster state. - self.__clusterState.clear() - if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.W_OK,)): - self.__log.critical(INVALID_STATE_FILE_MSGS[3] % \ - self.__userState.get_state_file()) - self.__opCode = 1 - return - self.__remove_cluster(clusterDir) - else: - self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True) - else: - print self.__hodhelp.help(operation) - self.__log.critical("%s operation requires one argument. " % operation - + "A cluster path.") - self.__opCode = 3 - - def _op_list(self, args): - operation = 'list' - clusterList = self.__userState.read(CLUSTER_DATA_FILE) - for path in clusterList.keys(): - if not os.path.isdir(path): - self.__log.info("cluster state unknown\t%s\t%s" % (clusterList[path], path)) - continue - self.__setup_cluster_state(path) - clusterInfo = self.__clusterState.read() - if clusterInfo == {}: - # something wrong with the cluster directory. - self.__log.info("cluster state unknown\t%s\t%s" % (clusterList[path], path)) - continue - clusterStatus = self.__cluster.check_cluster(clusterInfo) - if clusterStatus == 12: - self.__log.info("alive\t%s\t%s" % (clusterList[path], path)) - elif clusterStatus == 10: - self.__log.info("dead\t%s\t%s" % (clusterList[path], path)) - elif clusterStatus == 13: - self.__log.info("hdfs dead\t%s\t%s" % (clusterList[path], path)) - elif clusterStatus == 14: - self.__log.info("mapred dead\t%s\t%s" % (clusterList[path], path)) - - def _op_info(self, args): - operation = 'info' - argLength = len(args) - if argLength == 2: - clusterDir = self.__norm_cluster_dir(args[1]) - if os.path.isdir(clusterDir): - self.__setup_cluster_state(clusterDir) - clusterInfo = self.__clusterState.read() - if clusterInfo == {}: - # something wrong with the cluster directory. - self.__handle_invalid_cluster_directory(clusterDir) - else: - clusterStatus = self.__cluster.check_cluster(clusterInfo) - if clusterStatus == 12: - self.__print_cluster_info(clusterInfo) - self.__log.info("hadoop-site.xml at %s" % clusterDir) - elif clusterStatus == 10: - self.__log.critical("%s cluster is dead" % clusterDir) - elif clusterStatus == 13: - self.__log.warn("%s cluster hdfs is dead" % clusterDir) - elif clusterStatus == 14: - self.__log.warn("%s cluster mapred is dead" % clusterDir) - - if clusterStatus != 12: - if clusterStatus == 15: - self.__log.critical("Cluster %s not allocated." % clusterDir) - else: - self.__print_cluster_info(clusterInfo) - self.__log.info("hadoop-site.xml at %s" % clusterDir) - - self.__opCode = clusterStatus - else: - self.__handle_invalid_cluster_directory(clusterDir) - else: - print self.__hodhelp.help(operation) - self.__log.critical("%s operation requires one argument. " % operation - + "A cluster path.") - self.__opCode = 3 - - def __handle_invalid_cluster_directory(self, clusterDir, cleanUp=False): - if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.R_OK,)): - self.__log.critical(INVALID_STATE_FILE_MSGS[0] % \ - self.__userState.get_state_file()) - self.__opCode = 1 - return - - clusterList = self.__userState.read(CLUSTER_DATA_FILE) - if clusterDir in clusterList.keys(): - # previously allocated cluster. - self.__log.critical("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (clusterList[clusterDir], clusterDir)) - if cleanUp: - self.__cluster.delete_job(clusterList[clusterDir]) - self.__log.critical("Freeing resources allocated to the cluster.") - if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.W_OK,)): - self.__log.critical(INVALID_STATE_FILE_MSGS[1] % \ - self.__userState.get_state_file()) - self.__opCode = 1 - return - self.__remove_cluster(clusterDir) - self.__opCode = 3 - else: - if not os.path.exists(clusterDir): - self.__log.critical( \ - "Invalid hod.clusterdir(--hod.clusterdir or -d). " + \ - clusterDir + " : No such directory") - elif not os.path.isdir(clusterDir): - self.__log.critical( \ - "Invalid hod.clusterdir(--hod.clusterdir or -d). " + \ - clusterDir + " : Not a directory") - else: - self.__log.critical( \ - "Invalid hod.clusterdir(--hod.clusterdir or -d). " + \ - clusterDir + " : Not tied to any allocated cluster.") - self.__opCode = 15 - - def __print_cluster_info(self, clusterInfo): - keys = clusterInfo.keys() - - _dict = { - 'jobid' : 'Cluster Id', 'min' : 'Nodecount', - 'hdfs' : 'HDFS UI at' , 'mapred' : 'Mapred UI at' - } - - for key in _dict.keys(): - if clusterInfo.has_key(key): - self.__log.info("%s %s" % (_dict[key], clusterInfo[key])) - - if clusterInfo.has_key('ring'): - self.__log.debug("%s\t%s" % ('Ringmaster at ', clusterInfo['ring'])) - - if self.__cfg['hod']['debug'] == 4: - for var in clusterInfo['env'].keys(): - self.__log.debug("%s = %s" % (var, clusterInfo['env'][var])) - - def _op_help(self, arg): - if arg == None or arg.__len__() != 2: - print "hod commands:\n" - for op in self.__ops: - print self.__hodhelp.help(op) - else: - if arg[1] not in self.__ops: - print self.__hodhelp.help('help') - self.__log.critical("Help requested for invalid operation : %s"%arg[1]) - self.__opCode = 3 - else: print self.__hodhelp.help(arg[1]) - - def operation(self): - operation = self.__cfg['hod']['operation'] - try: - opList = self.__check_operation(operation) - if self.__opCode == 0: - if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.R_OK,)): - self.__log.critical(INVALID_STATE_FILE_MSGS[0] % \ - self.__userState.get_state_file()) - self.__opCode = 1 - return self.__opCode - getattr(self, "_op_%s" % opList[0])(opList) - except HodInterruptException, h: - self.__log.critical("op: %s failed because of a process interrupt." \ - % operation) - self.__opCode = HOD_INTERRUPTED_CODE - except: - self.__log.critical("op: %s failed: %s" % (operation, - get_exception_error_string())) - self.__log.debug(get_exception_string()) - - self.__cleanup() - - self.__log.debug("return code: %s" % self.__opCode) - - return self.__opCode - - def script(self): - errorFlag = False - errorMsgs = [] - scriptRet = 0 # return from the script, if run - - script = self.__cfg['hod']['script'] - nodes = self.__cfg['hod']['nodecount'] - clusterDir = self.__cfg['hod']['clusterdir'] - - if not os.path.exists(script): - errorFlag = True - errorMsgs.append("Invalid script file (--hod.script or -s) : " + \ - script + " : No such file") - elif not os.path.isfile(script): - errorFlag = True - errorMsgs.append("Invalid script file (--hod.script or -s) : " + \ - script + " : Not a file.") - else: - isExecutable = os.access(script, os.X_OK) - if not isExecutable: - errorFlag = True - errorMsgs.append("Invalid script file (--hod.script or -s) : " + \ - script + " : Not an executable.") - - if not os.path.exists(clusterDir): - try: - os.makedirs(clusterDir) - except OSError, err: - errorFlag = True - errorMsgs.append("Could not create cluster directory. %s" % (str(err))) - elif not os.path.isdir(clusterDir): - errorFlag = True - errorMsgs.append( \ - "Invalid cluster directory (--hod.clusterdir or -d) : " + \ - clusterDir + " : Not a directory") - - if int(self.__cfg['hod']['nodecount']) < 3 : - errorFlag = True - errorMsgs.append("Invalid nodecount (--hod.nodecount or -n) : " + \ - "Must be >= 3. Given nodes: %s" % nodes) - - if errorFlag: - for msg in errorMsgs: - self.__log.critical(msg) - self.handle_script_exit_code(scriptRet, clusterDir) - sys.exit(3) - - try: - self._op_allocate(('allocate', clusterDir, str(nodes))) - if self.__opCode == 0: - if self.__cfg['hod'].has_key('script-wait-time'): - time.sleep(self.__cfg['hod']['script-wait-time']) - self.__log.debug('Slept for %d time. Now going to run the script' % self.__cfg['hod']['script-wait-time']) - if hodInterrupt.isSet(): - self.__log.debug('Hod interrupted - not executing script') - else: - scriptRunner = hadoopScript(clusterDir, - self.__cfg['hod']['original-dir']) - self.__opCode = scriptRunner.run(script) - scriptRet = self.__opCode - self.__log.info("Exit code from running the script: %d" % self.__opCode) - else: - self.__log.critical("Error %d in allocating the cluster. Cannot run the script." % self.__opCode) - - if hodInterrupt.isSet(): - # Got interrupt while executing script. Unsetting it for deallocating - hodInterrupt.setFlag(False) - if self._is_cluster_allocated(clusterDir): - self._op_deallocate(('deallocate', clusterDir)) - except HodInterruptException, h: - self.__log.critical("Script failed because of a process interrupt.") - self.__opCode = HOD_INTERRUPTED_CODE - except: - self.__log.critical("script: %s failed: %s" % (script, - get_exception_error_string())) - self.__log.debug(get_exception_string()) - - self.__cleanup() - - self.handle_script_exit_code(scriptRet, clusterDir) - - return self.__opCode - - def handle_script_exit_code(self, scriptRet, clusterDir): - # We want to give importance to a failed script's exit code, and write out exit code to a file separately - # so users can easily get it if required. This way they can differentiate between the script's exit code - # and hod's exit code. - if os.path.exists(clusterDir): - exit_code_file_name = (os.path.join(clusterDir, 'script.exitcode')) - if scriptRet != 0: - exit_code_file = open(exit_code_file_name, 'w') - print >>exit_code_file, scriptRet - exit_code_file.close() - self.__opCode = scriptRet - else: - #ensure script exit code file is not there: - if (os.path.exists(exit_code_file_name)): - os.remove(exit_code_file_name) - -class hodHelp: - def __init__(self): - self.ops = ['allocate', 'deallocate', 'info', 'list','script', 'help'] - - self.usage_strings = \ - { - 'allocate' : 'hod allocate -d <clusterdir> -n <nodecount> [OPTIONS]', - 'deallocate' : 'hod deallocate -d <clusterdir> [OPTIONS]', - 'list' : 'hod list [OPTIONS]', - 'info' : 'hod info -d <clusterdir> [OPTIONS]', - 'script' : - 'hod script -d <clusterdir> -n <nodecount> -s <script> [OPTIONS]', - 'help' : 'hod help <OPERATION>', - } - - self.description_strings = \ - { - 'allocate' : "Allocates a cluster of n nodes using the specified \n" + \ - " cluster directory to store cluster state \n" + \ - " information. The Hadoop site XML is also stored \n" + \ - " in this location.\n", - - 'deallocate' : "Deallocates a cluster using the specified \n" + \ - " cluster directory. This operation is also \n" + \ - " required to clean up a dead cluster.\n", - - 'list' : "List all clusters currently allocated by a user, \n" + \ - " along with limited status information and the \n" + \ - " cluster ID.\n", - - 'info' : "Provide detailed information on an allocated cluster.\n", - - 'script' : "Allocates a cluster of n nodes with the given \n" +\ - " cluster directory, runs the specified script \n" + \ - " using the allocated cluster, and then \n" + \ - " deallocates the cluster.\n", - - 'help' : "Print help for the operation and exit.\n" + \ - "Available operations : %s.\n" % self.ops, - } - - def usage(self, op): - return "Usage : " + self.usage_strings[op] + "\n" + \ - "For full description: hod help " + op + ".\n" - - def help(self, op=None): - if op is None: - return "hod <operation> [ARGS] [OPTIONS]\n" + \ - "Available operations : %s\n" % self.ops + \ - "For help on a particular operation : hod help <operation>.\n" + \ - "For all options : hod help options." - else: - return "Usage : " + self.usage_strings[op] + "\n" + \ - "Description : " + self.description_strings[op] + \ - "For all options : hod help options.\n" |