# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """ Unit tests for Spark ML Python APIs. """ import sys try: import xmlrunner except ImportError: xmlrunner = None if sys.version_info[:2] <= (2, 6): try: import unittest2 as unittest except ImportError: sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier') sys.exit(1) else: import unittest from shutil import rmtree import tempfile from pyspark.ml import Estimator, Model, Pipeline, Transformer from pyspark.ml.classification import LogisticRegression from pyspark.ml.clustering import KMeans from pyspark.ml.evaluation import RegressionEvaluator from pyspark.ml.feature import * from pyspark.ml.param import Param, Params from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed from pyspark.ml.regression import LinearRegression from pyspark.ml.tuning import * from pyspark.ml.util import keyword_only from pyspark.mllib.linalg import DenseVector from pyspark.sql import DataFrame, SQLContext, Row from pyspark.sql.functions import rand from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase class MockDataset(DataFrame): def __init__(self): self.index = 0 class HasFake(Params): def __init__(self): super(HasFake, self).__init__() self.fake = Param(self, "fake", "fake param") def getFake(self): return self.getOrDefault(self.fake) class MockTransformer(Transformer, HasFake): def __init__(self): super(MockTransformer, self).__init__() self.dataset_index = None def _transform(self, dataset): self.dataset_index = dataset.index dataset.index += 1 return dataset class MockEstimator(Estimator, HasFake): def __init__(self): super(MockEstimator, self).__init__() self.dataset_index = None def _fit(self, dataset): self.dataset_index = dataset.index model = MockModel() self._copyValues(model) return model class MockModel(MockTransformer, Model, HasFake): pass class ParamTypeConversionTests(PySparkTestCase): """ Test that param type conversion happens. """ def test_int_to_float(self): from pyspark.mllib.linalg import Vectors df = self.sc.parallelize([ Row(label=1.0, weight=2.0, features=Vectors.dense(1.0))]).toDF() lr = LogisticRegression(elasticNetParam=0) lr.fit(df) lr.setElasticNetParam(0) lr.fit(df) def test_invalid_to_float(self): from pyspark.mllib.linalg import Vectors self.assertRaises(Exception, lambda: LogisticRegression(elasticNetParam="happy")) lr = LogisticRegression(elasticNetParam=0) self.assertRaises(Exception, lambda: lr.setElasticNetParam("panda")) class PipelineTests(PySparkTestCase): def test_pipeline(self): dataset = MockDataset() estimator0 = MockEstimator() transformer1 = MockTransformer() estimator2 = MockEstimator() transformer3 = MockTransformer() pipeline = Pipeline(stages=[estimator0, transformer1, estimator2, transformer3]) pipeline_model = pipeline.fit(dataset, {estimator0.fake: 0, transformer1.fake: 1}) model0, transformer1, model2, transformer3 = pipeline_model.stages self.assertEqual(0, model0.dataset_index) self.assertEqual(0, model0.getFake()) self.assertEqual(1, transformer1.dataset_index) self.assertEqual(1, transformer1.getFake()) self.assertEqual(2, dataset.index) self.assertIsNone(model2.dataset_index, "The last model shouldn't be called in fit.") self.assertIsNone(transformer3.dataset_index, "The last transformer shouldn't be called in fit.") dataset = pipeline_model.transform(dataset) self.assertEqual(2, model0.dataset_index) self.assertEqual(3, transformer1.dataset_index) self.assertEqual(4, model2.dataset_index) self.assertEqual(5, transformer3.dataset_index) self.assertEqual(6, dataset.index) class TestParams(HasMaxIter, HasInputCol, HasSeed): """ A subclass of Params mixed with HasMaxIter, HasInputCol and HasSeed. """ @keyword_only def __init__(self, seed=None): super(TestParams, self).__init__() self._setDefault(maxIter=10) kwargs = self.__init__._input_kwargs self.setParams(**kwargs) @keyword_only def setParams(self, seed=None): """ setParams(self, seed=None) Sets params for this test. """ kwargs = self.setParams._input_kwargs return self._set(**kwargs) class OtherTestParams(HasMaxIter, HasInputCol, HasSeed): """ A subclass of Params mixed with HasMaxIter, HasInputCol and HasSeed. """ @keyword_only def __init__(self, seed=None): super(OtherTestParams, self).__init__() self._setDefault(maxIter=10) kwargs = self.__init__._input_kwargs self.setParams(**kwargs) @keyword_only def setParams(self, seed=None): """ setParams(self, seed=None) Sets params for this test. """ kwargs = self.setParams._input_kwargs return self._set(**kwargs) class ParamTests(PySparkTestCase): def test_copy_new_parent(self): testParams = TestParams() # Copying an instantiated param should fail with self.assertRaises(ValueError): testParams.maxIter._copy_new_parent(testParams) # Copying a dummy param should succeed TestParams.maxIter._copy_new_parent(testParams) maxIter = testParams.maxIter self.assertEqual(maxIter.name, "maxIter") self.assertEqual(maxIter.doc, "max number of iterations (>= 0).") self.assertTrue(maxIter.parent == testParams.uid) def test_param(self): testParams = TestParams() maxIter = testParams.maxIter self.assertEqual(maxIter.name, "maxIter") self.assertEqual(maxIter.doc, "max number of iterations (>= 0).") self.assertTrue(maxIter.parent == testParams.uid) def test_hasparam(self): testParams = TestParams() self.assertTrue(all([testParams.hasParam(p.name) for p in testParams.params])) self.assertFalse(testParams.hasParam("notAParameter")) def test_params(self): testParams = TestParams() maxIter = testParams.maxIter inputCol = testParams.inputCol seed = testParams.seed params = testParams.params self.assertEqual(params, [inputCol, maxIter, seed]) self.assertTrue(testParams.hasParam(maxIter.name)) self.assertTrue(testParams.hasDefault(maxIter)) self.assertFalse(testParams.isSet(maxIter)) self.assertTrue(testParams.isDefined(maxIter)) self.assertEqual(testParams.getMaxIter(), 10) testParams.setMaxIter(100) self.assertTrue(testParams.isSet(maxIter)) self.assertEqual(testParams.getMaxIter(), 100) self.assertTrue(testParams.hasParam(inputCol.name)) self.assertFalse(testParams.hasDefault(inputCol)) self.assertFalse(testParams.isSet(inputCol)) self.assertFalse(testParams.isDefined(inputCol)) with self.assertRaises(KeyError): testParams.getInputCol() # Since the default is normally random, set it to a known number for debug str testParams._setDefault(seed=41) testParams.setSeed(43) self.assertEqual( testParams.explainParams(), "\n".join(["inputCol: input column name. (undefined)", "maxIter: max number of iterations (>= 0). (default: 10, current: 100)", "seed: random seed. (default: 41, current: 43)"])) def test_kmeans_param(self): algo = KMeans() self.assertEqual(algo.getInitMode(), "k-means||") algo.setK(10) self.assertEqual(algo.getK(), 10) algo.setInitSteps(10) self.assertEqual(algo.getInitSteps(), 10) def test_hasseed(self): noSeedSpecd = TestParams() withSeedSpecd = TestParams(seed=42) other = OtherTestParams() # Check that we no longer use 42 as the magic number self.assertNotEqual(noSeedSpecd.getSeed(), 42) origSeed = noSeedSpecd.getSeed() # Check that we only compute the seed once self.assertEqual(noSeedSpecd.getSeed(), origSeed) # Check that a specified seed is honored self.assertEqual(withSeedSpecd.getSeed(), 42) # Check that a different class has a different seed self.assertNotEqual(other.getSeed(), noSeedSpecd.getSeed()) class FeatureTests(PySparkTestCase): def test_binarizer(self): b0 = Binarizer() self.assertListEqual(b0.params, [b0.inputCol, b0.outputCol, b0.threshold]) self.assertTrue(all([~b0.isSet(p) for p in b0.params])) self.assertTrue(b0.hasDefault(b0.threshold)) self.assertEqual(b0.getThreshold(), 0.0) b0.setParams(inputCol="input", outputCol="output").setThreshold(1.0) self.assertTrue(all([b0.isSet(p) for p in b0.params])) self.assertEqual(b0.getThreshold(), 1.0) self.assertEqual(b0.getInputCol(), "input") self.assertEqual(b0.getOutputCol(), "output") b0c = b0.copy({b0.threshold: 2.0}) self.assertEqual(b0c.uid, b0.uid) self.assertListEqual(b0c.params, b0.params) self.assertEqual(b0c.getThreshold(), 2.0) b1 = Binarizer(threshold=2.0, inputCol="input", outputCol="output") self.assertNotEqual(b1.uid, b0.uid) self.assertEqual(b1.getThreshold(), 2.0) self.assertEqual(b1.getInputCol(), "input") self.assertEqual(b1.getOutputCol(), "output") def test_idf(self): sqlContext = SQLContext(self.sc) dataset = sqlContext.createDataFrame([ (DenseVector([1.0, 2.0]),), (DenseVector([0.0, 1.0]),), (DenseVector([3.0, 0.2]),)], ["tf"]) idf0 = IDF(inputCol="tf") self.assertListEqual(idf0.params, [idf0.inputCol, idf0.minDocFreq, idf0.outputCol]) idf0m = idf0.fit(dataset, {idf0.outputCol: "idf"}) self.assertEqual(idf0m.uid, idf0.uid, "Model should inherit the UID from its parent estimator.") output = idf0m.transform(dataset) self.assertIsNotNone(output.head().idf) def test_ngram(self): sqlContext = SQLContext(self.sc) dataset = sqlContext.createDataFrame([ Row(input=["a", "b", "c", "d", "e"])]) ngram0 = NGram(n=4, inputCol="input", outputCol="output") self.assertEqual(ngram0.getN(), 4) self.assertEqual(ngram0.getInputCol(), "input") self.assertEqual(ngram0.getOutputCol(), "output") transformedDF = ngram0.transform(dataset) self.assertEqual(transformedDF.head().output, ["a b c d", "b c d e"]) def test_stopwordsremover(self): sqlContext = SQLContext(self.sc) dataset = sqlContext.createDataFrame([Row(input=["a", "panda"])]) stopWordRemover = StopWordsRemover(inputCol="input", outputCol="output") # Default self.assertEqual(stopWordRemover.getInputCol(), "input") transformedDF = stopWordRemover.transform(dataset) self.assertEqual(transformedDF.head().output, ["panda"]) # Custom stopwords = ["panda"] stopWordRemover.setStopWords(stopwords) self.assertEqual(stopWordRemover.getInputCol(), "input") self.assertEqual(stopWordRemover.getStopWords(), stopwords) transformedDF = stopWordRemover.transform(dataset) self.assertEqual(transformedDF.head().output, ["a"]) class HasInducedError(Params): def __init__(self): super(HasInducedError, self).__init__() self.inducedError = Param(self, "inducedError", "Uniformly-distributed error added to feature") def getInducedError(self): return self.getOrDefault(self.inducedError) class InducedErrorModel(Model, HasInducedError): def __init__(self): super(InducedErrorModel, self).__init__() def _transform(self, dataset): return dataset.withColumn("prediction", dataset.feature + (rand(0) * self.getInducedError())) class InducedErrorEstimator(Estimator, HasInducedError): def __init__(self, inducedError=1.0): super(InducedErrorEstimator, self).__init__() self._set(inducedError=inducedError) def _fit(self, dataset): model = InducedErrorModel() self._copyValues(model) return model class CrossValidatorTests(PySparkTestCase): def test_fit_minimize_metric(self): sqlContext = SQLContext(self.sc) dataset = sqlContext.createDataFrame([ (10, 10.0), (50, 50.0), (100, 100.0), (500, 500.0)] * 10, ["feature", "label"]) iee = InducedErrorEstimator() evaluator = RegressionEvaluator(metricName="rmse") grid = (ParamGridBuilder() .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) .build()) cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator) cvModel = cv.fit(dataset) bestModel = cvModel.bestModel bestModelMetric = evaluator.evaluate(bestModel.transform(dataset)) self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), "Best model should have zero induced error") self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0") def test_fit_maximize_metric(self): sqlContext = SQLContext(self.sc) dataset = sqlContext.createDataFrame([ (10, 10.0), (50, 50.0), (100, 100.0), (500, 500.0)] * 10, ["feature", "label"]) iee = InducedErrorEstimator() evaluator = RegressionEvaluator(metricName="r2") grid = (ParamGridBuilder() .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) .build()) cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator) cvModel = cv.fit(dataset) bestModel = cvModel.bestModel bestModelMetric = evaluator.evaluate(bestModel.transform(dataset)) self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), "Best model should have zero induced error") self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1") class TrainValidationSplitTests(PySparkTestCase): def test_fit_minimize_metric(self): sqlContext = SQLContext(self.sc) dataset = sqlContext.createDataFrame([ (10, 10.0), (50, 50.0), (100, 100.0), (500, 500.0)] * 10, ["feature", "label"]) iee = InducedErrorEstimator() evaluator = RegressionEvaluator(metricName="rmse") grid = (ParamGridBuilder() .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) .build()) tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator) tvsModel = tvs.fit(dataset) bestModel = tvsModel.bestModel bestModelMetric = evaluator.evaluate(bestModel.transform(dataset)) self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), "Best model should have zero induced error") self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0") def test_fit_maximize_metric(self): sqlContext = SQLContext(self.sc) dataset = sqlContext.createDataFrame([ (10, 10.0), (50, 50.0), (100, 100.0), (500, 500.0)] * 10, ["feature", "label"]) iee = InducedErrorEstimator() evaluator = RegressionEvaluator(metricName="r2") grid = (ParamGridBuilder() .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) .build()) tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator) tvsModel = tvs.fit(dataset) bestModel = tvsModel.bestModel bestModelMetric = evaluator.evaluate(bestModel.transform(dataset)) self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), "Best model should have zero induced error") self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1") class PersistenceTest(PySparkTestCase): def test_linear_regression(self): lr = LinearRegression(maxIter=1) path = tempfile.mkdtemp() lr_path = path + "/lr" lr.save(lr_path) lr2 = LinearRegression.load(lr_path) self.assertEqual(lr2.uid, lr2.maxIter.parent, "Loaded LinearRegression instance uid (%s) did not match Param's uid (%s)" % (lr2.uid, lr2.maxIter.parent)) self.assertEqual(lr._defaultParamMap[lr.maxIter], lr2._defaultParamMap[lr2.maxIter], "Loaded LinearRegression instance default params did not match " + "original defaults") try: rmtree(path) except OSError: pass if __name__ == "__main__": from pyspark.ml.tests import * if xmlrunner: unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports')) else: unittest.main()