aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/ml/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/ml/tests.py')
-rwxr-xr-xpython/pyspark/ml/tests.py97
1 files changed, 44 insertions, 53 deletions
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index ad1631fb5b..49d3a4a332 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -57,13 +57,25 @@ from pyspark.ml.tuning import *
from pyspark.ml.wrapper import JavaParams
from pyspark.mllib.common import _java2py
from pyspark.mllib.linalg import Vectors, DenseVector, SparseVector
-from pyspark.sql import DataFrame, SQLContext, Row
+from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.functions import rand
from pyspark.sql.utils import IllegalArgumentException
from pyspark.storagelevel import *
from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase
+class SparkSessionTestCase(PySparkTestCase):
+ @classmethod
+ def setUpClass(cls):
+ PySparkTestCase.setUpClass()
+ cls.spark = SparkSession(cls.sc)
+
+ @classmethod
+ def tearDownClass(cls):
+ PySparkTestCase.tearDownClass()
+ cls.spark.stop()
+
+
class MockDataset(DataFrame):
def __init__(self):
@@ -350,7 +362,7 @@ class ParamTests(PySparkTestCase):
self.assertEqual(model.getWindowSize(), 6)
-class FeatureTests(PySparkTestCase):
+class FeatureTests(SparkSessionTestCase):
def test_binarizer(self):
b0 = Binarizer()
@@ -376,8 +388,7 @@ class FeatureTests(PySparkTestCase):
self.assertEqual(b1.getOutputCol(), "output")
def test_idf(self):
- sqlContext = SQLContext(self.sc)
- dataset = sqlContext.createDataFrame([
+ dataset = self.spark.createDataFrame([
(DenseVector([1.0, 2.0]),),
(DenseVector([0.0, 1.0]),),
(DenseVector([3.0, 0.2]),)], ["tf"])
@@ -390,8 +401,7 @@ class FeatureTests(PySparkTestCase):
self.assertIsNotNone(output.head().idf)
def test_ngram(self):
- sqlContext = SQLContext(self.sc)
- dataset = sqlContext.createDataFrame([
+ dataset = self.spark.createDataFrame([
Row(input=["a", "b", "c", "d", "e"])])
ngram0 = NGram(n=4, inputCol="input", outputCol="output")
self.assertEqual(ngram0.getN(), 4)
@@ -401,8 +411,7 @@ class FeatureTests(PySparkTestCase):
self.assertEqual(transformedDF.head().output, ["a b c d", "b c d e"])
def test_stopwordsremover(self):
- sqlContext = SQLContext(self.sc)
- dataset = sqlContext.createDataFrame([Row(input=["a", "panda"])])
+ dataset = self.spark.createDataFrame([Row(input=["a", "panda"])])
stopWordRemover = StopWordsRemover(inputCol="input", outputCol="output")
# Default
self.assertEqual(stopWordRemover.getInputCol(), "input")
@@ -419,15 +428,14 @@ class FeatureTests(PySparkTestCase):
self.assertEqual(transformedDF.head().output, ["a"])
# with language selection
stopwords = StopWordsRemover.loadDefaultStopWords("turkish")
- dataset = sqlContext.createDataFrame([Row(input=["acaba", "ama", "biri"])])
+ dataset = self.spark.createDataFrame([Row(input=["acaba", "ama", "biri"])])
stopWordRemover.setStopWords(stopwords)
self.assertEqual(stopWordRemover.getStopWords(), stopwords)
transformedDF = stopWordRemover.transform(dataset)
self.assertEqual(transformedDF.head().output, [])
def test_count_vectorizer_with_binary(self):
- sqlContext = SQLContext(self.sc)
- dataset = sqlContext.createDataFrame([
+ dataset = self.spark.createDataFrame([
(0, "a a a b b c".split(' '), SparseVector(3, {0: 1.0, 1: 1.0, 2: 1.0}),),
(1, "a a".split(' '), SparseVector(3, {0: 1.0}),),
(2, "a b".split(' '), SparseVector(3, {0: 1.0, 1: 1.0}),),
@@ -475,11 +483,10 @@ class InducedErrorEstimator(Estimator, HasInducedError):
return model
-class CrossValidatorTests(PySparkTestCase):
+class CrossValidatorTests(SparkSessionTestCase):
def test_copy(self):
- sqlContext = SQLContext(self.sc)
- dataset = sqlContext.createDataFrame([
+ dataset = self.spark.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
@@ -503,8 +510,7 @@ class CrossValidatorTests(PySparkTestCase):
< 0.0001)
def test_fit_minimize_metric(self):
- sqlContext = SQLContext(self.sc)
- dataset = sqlContext.createDataFrame([
+ dataset = self.spark.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
@@ -527,8 +533,7 @@ class CrossValidatorTests(PySparkTestCase):
self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
def test_fit_maximize_metric(self):
- sqlContext = SQLContext(self.sc)
- dataset = sqlContext.createDataFrame([
+ dataset = self.spark.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
@@ -554,8 +559,7 @@ class CrossValidatorTests(PySparkTestCase):
# This tests saving and loading the trained model only.
# Save/load for CrossValidator will be added later: SPARK-13786
temp_path = tempfile.mkdtemp()
- sqlContext = SQLContext(self.sc)
- dataset = sqlContext.createDataFrame(
+ dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
@@ -576,11 +580,10 @@ class CrossValidatorTests(PySparkTestCase):
self.assertEqual(loadedLrModel.intercept, lrModel.intercept)
-class TrainValidationSplitTests(PySparkTestCase):
+class TrainValidationSplitTests(SparkSessionTestCase):
def test_fit_minimize_metric(self):
- sqlContext = SQLContext(self.sc)
- dataset = sqlContext.createDataFrame([
+ dataset = self.spark.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
@@ -603,8 +606,7 @@ class TrainValidationSplitTests(PySparkTestCase):
self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
def test_fit_maximize_metric(self):
- sqlContext = SQLContext(self.sc)
- dataset = sqlContext.createDataFrame([
+ dataset = self.spark.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
@@ -630,8 +632,7 @@ class TrainValidationSplitTests(PySparkTestCase):
# This tests saving and loading the trained model only.
# Save/load for TrainValidationSplit will be added later: SPARK-13786
temp_path = tempfile.mkdtemp()
- sqlContext = SQLContext(self.sc)
- dataset = sqlContext.createDataFrame(
+ dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
@@ -652,7 +653,7 @@ class TrainValidationSplitTests(PySparkTestCase):
self.assertEqual(loadedLrModel.intercept, lrModel.intercept)
-class PersistenceTest(PySparkTestCase):
+class PersistenceTest(SparkSessionTestCase):
def test_linear_regression(self):
lr = LinearRegression(maxIter=1)
@@ -724,11 +725,10 @@ class PersistenceTest(PySparkTestCase):
"""
Pipeline[HashingTF, PCA]
"""
- sqlContext = SQLContext(self.sc)
temp_path = tempfile.mkdtemp()
try:
- df = sqlContext.createDataFrame([(["a", "b", "c"],), (["c", "d", "e"],)], ["words"])
+ df = self.spark.createDataFrame([(["a", "b", "c"],), (["c", "d", "e"],)], ["words"])
tf = HashingTF(numFeatures=10, inputCol="words", outputCol="features")
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
pl = Pipeline(stages=[tf, pca])
@@ -753,11 +753,10 @@ class PersistenceTest(PySparkTestCase):
"""
Pipeline[HashingTF, Pipeline[PCA]]
"""
- sqlContext = SQLContext(self.sc)
temp_path = tempfile.mkdtemp()
try:
- df = sqlContext.createDataFrame([(["a", "b", "c"],), (["c", "d", "e"],)], ["words"])
+ df = self.spark.createDataFrame([(["a", "b", "c"],), (["c", "d", "e"],)], ["words"])
tf = HashingTF(numFeatures=10, inputCol="words", outputCol="features")
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
p0 = Pipeline(stages=[pca])
@@ -816,7 +815,7 @@ class PersistenceTest(PySparkTestCase):
pass
-class LDATest(PySparkTestCase):
+class LDATest(SparkSessionTestCase):
def _compare(self, m1, m2):
"""
@@ -836,8 +835,7 @@ class LDATest(PySparkTestCase):
def test_persistence(self):
# Test save/load for LDA, LocalLDAModel, DistributedLDAModel.
- sqlContext = SQLContext(self.sc)
- df = sqlContext.createDataFrame([
+ df = self.spark.createDataFrame([
[1, Vectors.dense([0.0, 1.0])],
[2, Vectors.sparse(2, {0: 1.0})],
], ["id", "features"])
@@ -871,12 +869,11 @@ class LDATest(PySparkTestCase):
pass
-class TrainingSummaryTest(PySparkTestCase):
+class TrainingSummaryTest(SparkSessionTestCase):
def test_linear_regression_summary(self):
from pyspark.mllib.linalg import Vectors
- sqlContext = SQLContext(self.sc)
- df = sqlContext.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),
+ df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),
(0.0, 2.0, Vectors.sparse(1, [], []))],
["label", "weight", "features"])
lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", weightCol="weight",
@@ -914,8 +911,7 @@ class TrainingSummaryTest(PySparkTestCase):
def test_logistic_regression_summary(self):
from pyspark.mllib.linalg import Vectors
- sqlContext = SQLContext(self.sc)
- df = sqlContext.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),
+ df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),
(0.0, 2.0, Vectors.sparse(1, [], []))],
["label", "weight", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01, weightCol="weight", fitIntercept=False)
@@ -942,11 +938,10 @@ class TrainingSummaryTest(PySparkTestCase):
self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC)
-class OneVsRestTests(PySparkTestCase):
+class OneVsRestTests(SparkSessionTestCase):
def test_copy(self):
- sqlContext = SQLContext(self.sc)
- df = sqlContext.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
+ df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
(1.0, Vectors.sparse(2, [], [])),
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
@@ -960,8 +955,7 @@ class OneVsRestTests(PySparkTestCase):
self.assertEqual(model1.getPredictionCol(), "indexed")
def test_output_columns(self):
- sqlContext = SQLContext(self.sc)
- df = sqlContext.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
+ df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
(1.0, Vectors.sparse(2, [], [])),
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
@@ -973,8 +967,7 @@ class OneVsRestTests(PySparkTestCase):
def test_save_load(self):
temp_path = tempfile.mkdtemp()
- sqlContext = SQLContext(self.sc)
- df = sqlContext.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
+ df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
(1.0, Vectors.sparse(2, [], [])),
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
@@ -994,12 +987,11 @@ class OneVsRestTests(PySparkTestCase):
self.assertEqual(m.uid, n.uid)
-class HashingTFTest(PySparkTestCase):
+class HashingTFTest(SparkSessionTestCase):
def test_apply_binary_term_freqs(self):
- sqlContext = SQLContext(self.sc)
- df = sqlContext.createDataFrame([(0, ["a", "a", "b", "c", "c", "c"])], ["id", "words"])
+ df = self.spark.createDataFrame([(0, ["a", "a", "b", "c", "c", "c"])], ["id", "words"])
n = 10
hashingTF = HashingTF()
hashingTF.setInputCol("words").setOutputCol("features").setNumFeatures(n).setBinary(True)
@@ -1011,11 +1003,10 @@ class HashingTFTest(PySparkTestCase):
": expected " + str(expected[i]) + ", got " + str(features[i]))
-class ALSTest(PySparkTestCase):
+class ALSTest(SparkSessionTestCase):
def test_storage_levels(self):
- sqlContext = SQLContext(self.sc)
- df = sqlContext.createDataFrame(
+ df = self.spark.createDataFrame(
[(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
["user", "item", "rating"])
als = ALS().setMaxIter(1).setRank(1)