path: root/third_party/hadoop-0.20.0/contrib/hod/testing/testHod.py
diff options
Diffstat (limited to 'third_party/hadoop-0.20.0/contrib/hod/testing/testHod.py')
1 files changed, 310 insertions, 0 deletions
diff --git a/third_party/hadoop-0.20.0/contrib/hod/testing/testHod.py b/third_party/hadoop-0.20.0/contrib/hod/testing/testHod.py
new file mode 100644
index 0000000000..350cccb6e3
--- /dev/null
+++ b/third_party/hadoop-0.20.0/contrib/hod/testing/testHod.py
@@ -0,0 +1,310 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements. See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership. The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License. You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#See the License for the specific language governing permissions and
+#limitations under the License.
+import unittest, getpass, os, sys, re, threading, time
+myDirectory = os.path.realpath(sys.argv[0])
+rootDirectory = re.sub("/testing/.*", "", myDirectory)
+import tempfile
+from testing.lib import BaseTestSuite, MockLogger, MockHadoopCluster
+from hodlib.Hod.hod import hodRunner, hodState
+from hodlib.Common.desc import NodePoolDesc
+excludes = []
+# Information about all clusters is written to a file called clusters.state.
+from hodlib.Hod.hod import CLUSTER_DATA_FILE as TEST_CLUSTER_DATA_FILE, \
+# Temp directory prefix
+TMP_DIR_PREFIX=os.path.join('/tmp', 'hod-%s' % (getpass.getuser()))
+# build a config object with all required keys for initializing hod.
+def setupConf():
+ cfg = {
+ 'hod' : {
+ 'original-dir' : os.getcwd(),
+ 'stream' : True,
+ # store all the info about clusters in this directory
+ 'user_state' : '/tmp/hodtest',
+ 'debug' : 3,
+ 'java-home' : os.getenv('JAVA_HOME'),
+ 'cluster' : 'dummy',
+ 'cluster-factor' : 1.8,
+ 'xrs-port-range' : (32768,65536),
+ 'allocate-wait-time' : 3600,
+ 'temp-dir' : '/tmp/hod'
+ },
+ # just set everything to dummy. Need something to initialize the
+ # node pool description object.
+ 'resource_manager' : {
+ 'id' : 'dummy',
+ 'batch-home' : 'dummy',
+ 'queue' : 'dummy',
+ }
+ }
+ cfg['nodepooldesc'] = NodePoolDesc(cfg['resource_manager'])
+ return cfg
+# Test class that defines methods to test invalid arguments to hod operations.
+class test_InvalidArgsOperations(unittest.TestCase):
+ def setUp(self):
+ self.cfg = setupConf()
+ # initialize the mock objects
+ self.log = MockLogger()
+ self.cluster = MockHadoopCluster()
+ # Use the test logger. This will be used for test verification.
+ self.client = hodRunner(self.cfg, log=self.log, cluster=self.cluster)
+ # Create the hodState object to set the test state you want.
+ self.state = hodState(self.cfg['hod']['user_state'])
+ if not os.path.exists(self.cfg['hod']['user_state']):
+ os.path.mkdir(self.cfg['hod']['user_state'])
+ p = os.path.join(self.cfg['hod']['user_state'], '%s.state' % TEST_CLUSTER_DATA_FILE)
+ # ensure cluster data file exists, so write works in the tests.
+ f = open(p, 'w')
+ f.close()
+ def tearDown(self):
+ # clean up cluster data file and directory
+ p = os.path.join(self.cfg['hod']['user_state'], '%s.state' % TEST_CLUSTER_DATA_FILE)
+ os.remove(p)
+ os.rmdir(self.cfg['hod']['user_state'])
+ # Test that list works with deleted cluster directories - more than one entries which are invalid.
+ def testListInvalidDirectory(self):
+ userState = { os.path.join(TMP_DIR_PREFIX, 'testListInvalidDirectory1') : '123.dummy.id1',
+ os.path.join(TMP_DIR_PREFIX, 'testListInvalidDirectory2') : '123.dummy.id2' }
+ self.__setupClusterState(userState)
+ self.client._op_list(['list'])
+ # assert that required errors are logged.
+ for clusterDir in userState.keys():
+ self.assertTrue(self.log.hasMessage('cluster state unknown\t%s\t%s' \
+ % (userState[clusterDir], clusterDir), 'info'))
+ # simulate a test where a directory is deleted, and created again, without deallocation
+ clusterDir = os.path.join(TMP_DIR_PREFIX, 'testListEmptyDirectory')
+ os.makedirs(clusterDir)
+ self.assertTrue(os.path.isdir(clusterDir))
+ userState = { clusterDir : '123.dummy.id3' }
+ self.__setupClusterState(userState, False)
+ self.client._op_list(['list'])
+ self.assertTrue(self.log.hasMessage('cluster state unknown\t%s\t%s' \
+ % (userState[clusterDir], clusterDir), 'info'))
+ os.rmdir(clusterDir)
+ # Test that info works with a deleted cluster directory
+ def testInfoInvalidDirectory(self):
+ clusterDir = os.path.join(TMP_DIR_PREFIX, 'testInfoInvalidDirectory')
+ userState = { clusterDir : '456.dummy.id' }
+ self.__setupClusterState(userState)
+ self.client._op_info(['info', clusterDir])
+ self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
+ # simulate a test where a directory is deleted, and created again, without deallocation
+ clusterDir = os.path.join(TMP_DIR_PREFIX, 'testInfoEmptyDirectory')
+ os.makedirs(clusterDir)
+ self.assertTrue(os.path.isdir(clusterDir))
+ userState = { clusterDir : '456.dummy.id1' }
+ self.__setupClusterState(userState, False)
+ self.client._op_info(['info', clusterDir])
+ self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
+ os.rmdir(clusterDir)
+ # Test info works with an invalid cluster directory
+ def testInfoNonExistentDirectory(self):
+ clusterDir = '/tmp/hod/testInfoNonExistentDirectory'
+ self.client._op_info(['info', clusterDir])
+ self.assertTrue(self.log.hasMessage("Invalid hod.clusterdir(--hod.clusterdir or -d). %s : No such directory" % (clusterDir), 'critical'))
+ # Test that deallocation works on a deleted cluster directory
+ # by clearing the job, and removing the state
+ def testDeallocateInvalidDirectory(self):
+ clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateInvalidDirectory')
+ jobid = '789.dummy.id'
+ userState = { clusterDir : jobid }
+ self.__setupClusterState(userState)
+ self.client._op_deallocate(['deallocate', clusterDir])
+ # verify job was deleted
+ self.assertTrue(self.cluster.wasOperationPerformed('delete_job', jobid))
+ # verify appropriate message was logged.
+ self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
+ self.assertTrue(self.log.hasMessage("Freeing resources allocated to the cluster.", 'critical'))
+ # verify that the state information was cleared.
+ userState = self.state.read(TEST_CLUSTER_DATA_FILE)
+ self.assertFalse(clusterDir in userState.keys())
+ # simulate a test where a directory is deleted, and created again, without deallocation
+ clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateEmptyDirectory')
+ os.makedirs(clusterDir)
+ self.assertTrue(os.path.isdir(clusterDir))
+ jobid = '789.dummy.id1'
+ userState = { clusterDir : jobid }
+ self.__setupClusterState(userState, False)
+ self.client._op_deallocate(['deallocate', clusterDir])
+ # verify job was deleted
+ self.assertTrue(self.cluster.wasOperationPerformed('delete_job', jobid))
+ # verify appropriate message was logged.
+ self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
+ self.assertTrue(self.log.hasMessage("Freeing resources allocated to the cluster.", 'critical'))
+ # verify that the state information was cleared.
+ userState = self.state.read(TEST_CLUSTER_DATA_FILE)
+ self.assertFalse(clusterDir in userState.keys())
+ os.rmdir(clusterDir)
+ # Test that deallocation works on a nonexistent directory.
+ def testDeallocateNonExistentDirectory(self):
+ clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateNonExistentDirectory')
+ self.client._op_deallocate(['deallocate', clusterDir])
+ # there should be no call..
+ self.assertFalse(self.cluster.wasOperationPerformed('delete_job', None))
+ self.assertTrue(self.log.hasMessage("Invalid hod.clusterdir(--hod.clusterdir or -d). %s : No such directory" % (clusterDir), 'critical'))
+ # Test that allocation on an previously deleted directory fails.
+ def testAllocateOnDeletedDirectory(self):
+ clusterDir = os.path.join(TMP_DIR_PREFIX, 'testAllocateOnDeletedDirectory')
+ os.makedirs(clusterDir)
+ self.assertTrue(os.path.isdir(clusterDir))
+ jobid = '1234.abc.com'
+ userState = { clusterDir : jobid }
+ self.__setupClusterState(userState, False)
+ self.client._op_allocate(['allocate', clusterDir, '3'])
+ self.assertTrue(self.log.hasMessage("Found a previously allocated cluster at "\
+ "cluster directory '%s'. HOD cannot determine if this cluster "\
+ "can be automatically deallocated. Deallocate the cluster if it "\
+ "is unused." % (clusterDir), 'critical'))
+ os.rmdir(clusterDir)
+ def __setupClusterState(self, clusterStateMap, verifyDirIsAbsent=True):
+ for clusterDir in clusterStateMap.keys():
+ # ensure directory doesn't exist, just in case.
+ if verifyDirIsAbsent:
+ self.assertFalse(os.path.exists(clusterDir))
+ # set up required state.
+ self.state.write(TEST_CLUSTER_DATA_FILE, clusterStateMap)
+ # verify everything is stored correctly.
+ state = self.state.read(TEST_CLUSTER_DATA_FILE)
+ for clusterDir in clusterStateMap.keys():
+ self.assertTrue(clusterDir in state.keys())
+ self.assertEquals(clusterStateMap[clusterDir], state[clusterDir])
+class test_InvalidHodStateFiles(unittest.TestCase):
+ def setUp(self):
+ self.rootDir = '/tmp/hod-%s' % getpass.getuser()
+ self.cfg = setupConf() # creat a conf
+ # Modify hod.user_state
+ self.cfg['hod']['user_state'] = tempfile.mkdtemp(dir=self.rootDir,
+ prefix='HodTestSuite.test_InvalidHodStateFiles_')
+ self.log = MockLogger() # mock logger
+ self.cluster = MockHadoopCluster() # mock hadoop cluster
+ self.client = hodRunner(self.cfg, log=self.log, cluster=self.cluster)
+ self.state = hodState(self.cfg['hod']['user_state'])
+ self.statePath = os.path.join(self.cfg['hod']['user_state'], '%s.state' % \
+ self.clusterDir = tempfile.mkdtemp(dir=self.rootDir,
+ prefix='HodTestSuite.test_InvalidHodStateFiles_')
+ def testOperationWithInvalidStateFile(self):
+ jobid = '1234.hadoop.apache.org'
+ # create user state file with invalid permissions
+ stateFile = open(self.statePath, "w")
+ os.chmod(self.statePath, 000) # has no read/write permissions
+ self.client._hodRunner__cfg['hod']['operation'] = \
+ "info %s" % self.clusterDir
+ ret = self.client.operation()
+ os.chmod(self.statePath, 700) # restore permissions
+ stateFile.close()
+ os.remove(self.statePath)
+ # print self.log._MockLogger__logLines
+ self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % \
+ os.path.realpath(self.statePath), 'critical'))
+ self.assertEquals(ret, 1)
+ def testAllocateWithInvalidStateFile(self):
+ jobid = '1234.hadoop.apache.org'
+ # create user state file with invalid permissions
+ stateFile = open(self.statePath, "w")
+ os.chmod(self.statePath, 0400) # has no write permissions
+ self.client._hodRunner__cfg['hod']['operation'] = \
+ "allocate %s %s" % (self.clusterDir, '3')
+ ret = self.client.operation()
+ os.chmod(self.statePath, 700) # restore permissions
+ stateFile.close()
+ os.remove(self.statePath)
+ # print self.log._MockLogger__logLines
+ self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[2] % \
+ os.path.realpath(self.statePath), 'critical'))
+ self.assertEquals(ret, 1)
+ def testAllocateWithInvalidStateStore(self):
+ jobid = '1234.hadoop.apache.org'
+ self.client._hodRunner__cfg['hod']['operation'] = \
+ "allocate %s %s" % (self.clusterDir, 3)
+ ###### check with no executable permissions ######
+ stateFile = open(self.statePath, "w") # create user state file
+ os.chmod(self.cfg['hod']['user_state'], 0600)
+ ret = self.client.operation()
+ os.chmod(self.cfg['hod']['user_state'], 0700) # restore permissions
+ stateFile.close()
+ os.remove(self.statePath)
+ # print self.log._MockLogger__logLines
+ self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % \
+ os.path.realpath(self.statePath), 'critical'))
+ self.assertEquals(ret, 1)
+ ###### check with no write permissions ######
+ stateFile = open(self.statePath, "w") # create user state file
+ os.chmod(self.cfg['hod']['user_state'], 0500)
+ ret = self.client.operation()
+ os.chmod(self.cfg['hod']['user_state'], 0700) # restore permissions
+ stateFile.close()
+ os.remove(self.statePath)
+ # print self.log._MockLogger__logLines
+ self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % \
+ os.path.realpath(self.statePath), 'critical'))
+ self.assertEquals(ret, 1)
+ def tearDown(self):
+ if os.path.exists(self.clusterDir): os.rmdir(self.clusterDir)
+ if os.path.exists(self.cfg['hod']['user_state']):
+ os.rmdir(self.cfg['hod']['user_state'])
+class HodTestSuite(BaseTestSuite):
+ def __init__(self):
+ # suite setup
+ BaseTestSuite.__init__(self, __name__, excludes)
+ pass
+ def cleanUp(self):
+ # suite tearDown
+ pass
+def RunHodTests():
+ # modulename_suite
+ suite = HodTestSuite()
+ testResult = suite.runTests()
+ suite.cleanUp()
+ return testResult
+if __name__ == "__main__":
+ RunHodTests()