diff options
Diffstat (limited to 'python/pyspark/ml/tests.py')
-rwxr-xr-x | python/pyspark/ml/tests.py | 97 |
1 files changed, 44 insertions, 53 deletions
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index ad1631fb5b..49d3a4a332 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -57,13 +57,25 @@ from pyspark.ml.tuning import * from pyspark.ml.wrapper import JavaParams from pyspark.mllib.common import _java2py from pyspark.mllib.linalg import Vectors, DenseVector, SparseVector -from pyspark.sql import DataFrame, SQLContext, Row +from pyspark.sql import DataFrame, Row, SparkSession from pyspark.sql.functions import rand from pyspark.sql.utils import IllegalArgumentException from pyspark.storagelevel import * from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase +class SparkSessionTestCase(PySparkTestCase): + @classmethod + def setUpClass(cls): + PySparkTestCase.setUpClass() + cls.spark = SparkSession(cls.sc) + + @classmethod + def tearDownClass(cls): + PySparkTestCase.tearDownClass() + cls.spark.stop() + + class MockDataset(DataFrame): def __init__(self): @@ -350,7 +362,7 @@ class ParamTests(PySparkTestCase): self.assertEqual(model.getWindowSize(), 6) -class FeatureTests(PySparkTestCase): +class FeatureTests(SparkSessionTestCase): def test_binarizer(self): b0 = Binarizer() @@ -376,8 +388,7 @@ class FeatureTests(PySparkTestCase): self.assertEqual(b1.getOutputCol(), "output") def test_idf(self): - sqlContext = SQLContext(self.sc) - dataset = sqlContext.createDataFrame([ + dataset = self.spark.createDataFrame([ (DenseVector([1.0, 2.0]),), (DenseVector([0.0, 1.0]),), (DenseVector([3.0, 0.2]),)], ["tf"]) @@ -390,8 +401,7 @@ class FeatureTests(PySparkTestCase): self.assertIsNotNone(output.head().idf) def test_ngram(self): - sqlContext = SQLContext(self.sc) - dataset = sqlContext.createDataFrame([ + dataset = self.spark.createDataFrame([ Row(input=["a", "b", "c", "d", "e"])]) ngram0 = NGram(n=4, inputCol="input", outputCol="output") self.assertEqual(ngram0.getN(), 4) @@ -401,8 +411,7 @@ class FeatureTests(PySparkTestCase): self.assertEqual(transformedDF.head().output, ["a b c d", "b c d e"]) def test_stopwordsremover(self): - sqlContext = SQLContext(self.sc) - dataset = sqlContext.createDataFrame([Row(input=["a", "panda"])]) + dataset = self.spark.createDataFrame([Row(input=["a", "panda"])]) stopWordRemover = StopWordsRemover(inputCol="input", outputCol="output") # Default self.assertEqual(stopWordRemover.getInputCol(), "input") @@ -419,15 +428,14 @@ class FeatureTests(PySparkTestCase): self.assertEqual(transformedDF.head().output, ["a"]) # with language selection stopwords = StopWordsRemover.loadDefaultStopWords("turkish") - dataset = sqlContext.createDataFrame([Row(input=["acaba", "ama", "biri"])]) + dataset = self.spark.createDataFrame([Row(input=["acaba", "ama", "biri"])]) stopWordRemover.setStopWords(stopwords) self.assertEqual(stopWordRemover.getStopWords(), stopwords) transformedDF = stopWordRemover.transform(dataset) self.assertEqual(transformedDF.head().output, []) def test_count_vectorizer_with_binary(self): - sqlContext = SQLContext(self.sc) - dataset = sqlContext.createDataFrame([ + dataset = self.spark.createDataFrame([ (0, "a a a b b c".split(' '), SparseVector(3, {0: 1.0, 1: 1.0, 2: 1.0}),), (1, "a a".split(' '), SparseVector(3, {0: 1.0}),), (2, "a b".split(' '), SparseVector(3, {0: 1.0, 1: 1.0}),), @@ -475,11 +483,10 @@ class InducedErrorEstimator(Estimator, HasInducedError): return model -class CrossValidatorTests(PySparkTestCase): +class CrossValidatorTests(SparkSessionTestCase): def test_copy(self): - sqlContext = SQLContext(self.sc) - dataset = sqlContext.createDataFrame([ + dataset = self.spark.createDataFrame([ (10, 10.0), (50, 50.0), (100, 100.0), @@ -503,8 +510,7 @@ class CrossValidatorTests(PySparkTestCase): < 0.0001) def test_fit_minimize_metric(self): - sqlContext = SQLContext(self.sc) - dataset = sqlContext.createDataFrame([ + dataset = self.spark.createDataFrame([ (10, 10.0), (50, 50.0), (100, 100.0), @@ -527,8 +533,7 @@ class CrossValidatorTests(PySparkTestCase): self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0") def test_fit_maximize_metric(self): - sqlContext = SQLContext(self.sc) - dataset = sqlContext.createDataFrame([ + dataset = self.spark.createDataFrame([ (10, 10.0), (50, 50.0), (100, 100.0), @@ -554,8 +559,7 @@ class CrossValidatorTests(PySparkTestCase): # This tests saving and loading the trained model only. # Save/load for CrossValidator will be added later: SPARK-13786 temp_path = tempfile.mkdtemp() - sqlContext = SQLContext(self.sc) - dataset = sqlContext.createDataFrame( + dataset = self.spark.createDataFrame( [(Vectors.dense([0.0]), 0.0), (Vectors.dense([0.4]), 1.0), (Vectors.dense([0.5]), 0.0), @@ -576,11 +580,10 @@ class CrossValidatorTests(PySparkTestCase): self.assertEqual(loadedLrModel.intercept, lrModel.intercept) -class TrainValidationSplitTests(PySparkTestCase): +class TrainValidationSplitTests(SparkSessionTestCase): def test_fit_minimize_metric(self): - sqlContext = SQLContext(self.sc) - dataset = sqlContext.createDataFrame([ + dataset = self.spark.createDataFrame([ (10, 10.0), (50, 50.0), (100, 100.0), @@ -603,8 +606,7 @@ class TrainValidationSplitTests(PySparkTestCase): self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0") def test_fit_maximize_metric(self): - sqlContext = SQLContext(self.sc) - dataset = sqlContext.createDataFrame([ + dataset = self.spark.createDataFrame([ (10, 10.0), (50, 50.0), (100, 100.0), @@ -630,8 +632,7 @@ class TrainValidationSplitTests(PySparkTestCase): # This tests saving and loading the trained model only. # Save/load for TrainValidationSplit will be added later: SPARK-13786 temp_path = tempfile.mkdtemp() - sqlContext = SQLContext(self.sc) - dataset = sqlContext.createDataFrame( + dataset = self.spark.createDataFrame( [(Vectors.dense([0.0]), 0.0), (Vectors.dense([0.4]), 1.0), (Vectors.dense([0.5]), 0.0), @@ -652,7 +653,7 @@ class TrainValidationSplitTests(PySparkTestCase): self.assertEqual(loadedLrModel.intercept, lrModel.intercept) -class PersistenceTest(PySparkTestCase): +class PersistenceTest(SparkSessionTestCase): def test_linear_regression(self): lr = LinearRegression(maxIter=1) @@ -724,11 +725,10 @@ class PersistenceTest(PySparkTestCase): """ Pipeline[HashingTF, PCA] """ - sqlContext = SQLContext(self.sc) temp_path = tempfile.mkdtemp() try: - df = sqlContext.createDataFrame([(["a", "b", "c"],), (["c", "d", "e"],)], ["words"]) + df = self.spark.createDataFrame([(["a", "b", "c"],), (["c", "d", "e"],)], ["words"]) tf = HashingTF(numFeatures=10, inputCol="words", outputCol="features") pca = PCA(k=2, inputCol="features", outputCol="pca_features") pl = Pipeline(stages=[tf, pca]) @@ -753,11 +753,10 @@ class PersistenceTest(PySparkTestCase): """ Pipeline[HashingTF, Pipeline[PCA]] """ - sqlContext = SQLContext(self.sc) temp_path = tempfile.mkdtemp() try: - df = sqlContext.createDataFrame([(["a", "b", "c"],), (["c", "d", "e"],)], ["words"]) + df = self.spark.createDataFrame([(["a", "b", "c"],), (["c", "d", "e"],)], ["words"]) tf = HashingTF(numFeatures=10, inputCol="words", outputCol="features") pca = PCA(k=2, inputCol="features", outputCol="pca_features") p0 = Pipeline(stages=[pca]) @@ -816,7 +815,7 @@ class PersistenceTest(PySparkTestCase): pass -class LDATest(PySparkTestCase): +class LDATest(SparkSessionTestCase): def _compare(self, m1, m2): """ @@ -836,8 +835,7 @@ class LDATest(PySparkTestCase): def test_persistence(self): # Test save/load for LDA, LocalLDAModel, DistributedLDAModel. - sqlContext = SQLContext(self.sc) - df = sqlContext.createDataFrame([ + df = self.spark.createDataFrame([ [1, Vectors.dense([0.0, 1.0])], [2, Vectors.sparse(2, {0: 1.0})], ], ["id", "features"]) @@ -871,12 +869,11 @@ class LDATest(PySparkTestCase): pass -class TrainingSummaryTest(PySparkTestCase): +class TrainingSummaryTest(SparkSessionTestCase): def test_linear_regression_summary(self): from pyspark.mllib.linalg import Vectors - sqlContext = SQLContext(self.sc) - df = sqlContext.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)), + df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)), (0.0, 2.0, Vectors.sparse(1, [], []))], ["label", "weight", "features"]) lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", weightCol="weight", @@ -914,8 +911,7 @@ class TrainingSummaryTest(PySparkTestCase): def test_logistic_regression_summary(self): from pyspark.mllib.linalg import Vectors - sqlContext = SQLContext(self.sc) - df = sqlContext.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)), + df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)), (0.0, 2.0, Vectors.sparse(1, [], []))], ["label", "weight", "features"]) lr = LogisticRegression(maxIter=5, regParam=0.01, weightCol="weight", fitIntercept=False) @@ -942,11 +938,10 @@ class TrainingSummaryTest(PySparkTestCase): self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC) -class OneVsRestTests(PySparkTestCase): +class OneVsRestTests(SparkSessionTestCase): def test_copy(self): - sqlContext = SQLContext(self.sc) - df = sqlContext.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), + df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), (1.0, Vectors.sparse(2, [], [])), (2.0, Vectors.dense(0.5, 0.5))], ["label", "features"]) @@ -960,8 +955,7 @@ class OneVsRestTests(PySparkTestCase): self.assertEqual(model1.getPredictionCol(), "indexed") def test_output_columns(self): - sqlContext = SQLContext(self.sc) - df = sqlContext.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), + df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), (1.0, Vectors.sparse(2, [], [])), (2.0, Vectors.dense(0.5, 0.5))], ["label", "features"]) @@ -973,8 +967,7 @@ class OneVsRestTests(PySparkTestCase): def test_save_load(self): temp_path = tempfile.mkdtemp() - sqlContext = SQLContext(self.sc) - df = sqlContext.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), + df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), (1.0, Vectors.sparse(2, [], [])), (2.0, Vectors.dense(0.5, 0.5))], ["label", "features"]) @@ -994,12 +987,11 @@ class OneVsRestTests(PySparkTestCase): self.assertEqual(m.uid, n.uid) -class HashingTFTest(PySparkTestCase): +class HashingTFTest(SparkSessionTestCase): def test_apply_binary_term_freqs(self): - sqlContext = SQLContext(self.sc) - df = sqlContext.createDataFrame([(0, ["a", "a", "b", "c", "c", "c"])], ["id", "words"]) + df = self.spark.createDataFrame([(0, ["a", "a", "b", "c", "c", "c"])], ["id", "words"]) n = 10 hashingTF = HashingTF() hashingTF.setInputCol("words").setOutputCol("features").setNumFeatures(n).setBinary(True) @@ -1011,11 +1003,10 @@ class HashingTFTest(PySparkTestCase): ": expected " + str(expected[i]) + ", got " + str(features[i])) -class ALSTest(PySparkTestCase): +class ALSTest(SparkSessionTestCase): def test_storage_levels(self): - sqlContext = SQLContext(self.sc) - df = sqlContext.createDataFrame( + df = self.spark.createDataFrame( [(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)], ["user", "item", "rating"]) als = ALS().setMaxIter(1).setRank(1) |