diff options
author | DB Tsai <dbt@netflix.com> | 2016-05-17 12:51:07 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2016-05-17 12:51:07 -0700 |
commit | e2efe0529acd748f26dbaa41331d1733ed256237 (patch) | |
tree | fe1a5aeeadfbf220b5dbe1429e0235153db8117b /python/pyspark/ml/tests.py | |
parent | 9f176dd3918129a72282a6b7a12e2899cbb6dac9 (diff) | |
download | spark-e2efe0529acd748f26dbaa41331d1733ed256237.tar.gz spark-e2efe0529acd748f26dbaa41331d1733ed256237.tar.bz2 spark-e2efe0529acd748f26dbaa41331d1733ed256237.zip |
[SPARK-14615][ML] Use the new ML Vector and Matrix in the ML pipeline based algorithms
## What changes were proposed in this pull request?
Once SPARK-14487 and SPARK-14549 are merged, we will migrate to use the new vector and matrix type in the new ml pipeline based apis.
## How was this patch tested?
Unit tests
Author: DB Tsai <dbt@netflix.com>
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Author: Xiangrui Meng <meng@databricks.com>
Closes #12627 from dbtsai/SPARK-14615-NewML.
Diffstat (limited to 'python/pyspark/ml/tests.py')
-rwxr-xr-x | python/pyspark/ml/tests.py | 136 |
1 files changed, 59 insertions, 77 deletions
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index c567905759..e3511120bd 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -62,10 +62,6 @@ from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, \ from pyspark.ml.tuning import * from pyspark.ml.wrapper import JavaParams from pyspark.mllib.common import _java2py -from pyspark.mllib.linalg import SparseVector as OldSparseVector, DenseVector as OldDenseVector,\ - DenseMatrix as OldDenseMatrix, MatrixUDT as OldMatrixUDT, SparseMatrix as OldSparseMatrix,\ - Vectors as OldVectors, VectorUDT as OldVectorUDT -from pyspark.mllib.regression import LabeledPoint from pyspark.serializers import PickleSerializer from pyspark.sql import DataFrame, Row, SparkSession from pyspark.sql.functions import rand @@ -162,22 +158,22 @@ class ParamTypeConversionTests(PySparkTestCase): def test_vector(self): ewp = ElementwiseProduct(scalingVec=[1, 3]) - self.assertEqual(ewp.getScalingVec(), OldDenseVector([1.0, 3.0])) + self.assertEqual(ewp.getScalingVec(), DenseVector([1.0, 3.0])) ewp = ElementwiseProduct(scalingVec=np.array([1.2, 3.4])) - self.assertEqual(ewp.getScalingVec(), OldDenseVector([1.2, 3.4])) + self.assertEqual(ewp.getScalingVec(), DenseVector([1.2, 3.4])) self.assertRaises(TypeError, lambda: ElementwiseProduct(scalingVec=["a", "b"])) def test_list(self): l = [0, 1] - for lst_like in [l, np.array(l), OldDenseVector(l), OldSparseVector(len(l), + for lst_like in [l, np.array(l), DenseVector(l), SparseVector(len(l), range(len(l)), l), pyarray.array('l', l), xrange(2), tuple(l)]: converted = TypeConverters.toList(lst_like) self.assertEqual(type(converted), list) self.assertListEqual(converted, l) def test_list_int(self): - for indices in [[1.0, 2.0], np.array([1.0, 2.0]), OldDenseVector([1.0, 2.0]), - OldSparseVector(2, {0: 1.0, 1: 2.0}), xrange(1, 3), (1.0, 2.0), + for indices in [[1.0, 2.0], np.array([1.0, 2.0]), DenseVector([1.0, 2.0]), + SparseVector(2, {0: 1.0, 1: 2.0}), xrange(1, 3), (1.0, 2.0), pyarray.array('d', [1.0, 2.0])]: vs = VectorSlicer(indices=indices) self.assertListEqual(vs.getIndices(), [1, 2]) @@ -410,9 +406,9 @@ class FeatureTests(SparkSessionTestCase): def test_idf(self): dataset = self.spark.createDataFrame([ - (OldDenseVector([1.0, 2.0]),), - (OldDenseVector([0.0, 1.0]),), - (OldDenseVector([3.0, 0.2]),)], ["tf"]) + (DenseVector([1.0, 2.0]),), + (DenseVector([0.0, 1.0]),), + (DenseVector([3.0, 0.2]),)], ["tf"]) idf0 = IDF(inputCol="tf") self.assertListEqual(idf0.params, [idf0.inputCol, idf0.minDocFreq, idf0.outputCol]) idf0m = idf0.fit(dataset, {idf0.outputCol: "idf"}) @@ -457,10 +453,10 @@ class FeatureTests(SparkSessionTestCase): def test_count_vectorizer_with_binary(self): dataset = self.spark.createDataFrame([ - (0, "a a a b b c".split(' '), OldSparseVector(3, {0: 1.0, 1: 1.0, 2: 1.0}),), - (1, "a a".split(' '), OldSparseVector(3, {0: 1.0}),), - (2, "a b".split(' '), OldSparseVector(3, {0: 1.0, 1: 1.0}),), - (3, "c".split(' '), OldSparseVector(3, {2: 1.0}),)], ["id", "words", "expected"]) + (0, "a a a b b c".split(' '), SparseVector(3, {0: 1.0, 1: 1.0, 2: 1.0}),), + (1, "a a".split(' '), SparseVector(3, {0: 1.0}),), + (2, "a b".split(' '), SparseVector(3, {0: 1.0, 1: 1.0}),), + (3, "c".split(' '), SparseVector(3, {2: 1.0}),)], ["id", "words", "expected"]) cv = CountVectorizer(binary=True, inputCol="words", outputCol="features") model = cv.fit(dataset) @@ -581,11 +577,11 @@ class CrossValidatorTests(SparkSessionTestCase): # Save/load for CrossValidator will be added later: SPARK-13786 temp_path = tempfile.mkdtemp() dataset = self.spark.createDataFrame( - [(OldVectors.dense([0.0]), 0.0), - (OldVectors.dense([0.4]), 1.0), - (OldVectors.dense([0.5]), 0.0), - (OldVectors.dense([0.6]), 1.0), - (OldVectors.dense([1.0]), 1.0)] * 10, + [(Vectors.dense([0.0]), 0.0), + (Vectors.dense([0.4]), 1.0), + (Vectors.dense([0.5]), 0.0), + (Vectors.dense([0.6]), 1.0), + (Vectors.dense([1.0]), 1.0)] * 10, ["features", "label"]) lr = LogisticRegression() grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() @@ -654,11 +650,11 @@ class TrainValidationSplitTests(SparkSessionTestCase): # Save/load for TrainValidationSplit will be added later: SPARK-13786 temp_path = tempfile.mkdtemp() dataset = self.spark.createDataFrame( - [(OldVectors.dense([0.0]), 0.0), - (OldVectors.dense([0.4]), 1.0), - (OldVectors.dense([0.5]), 0.0), - (OldVectors.dense([0.6]), 1.0), - (OldVectors.dense([1.0]), 1.0)] * 10, + [(Vectors.dense([0.0]), 0.0), + (Vectors.dense([0.4]), 1.0), + (Vectors.dense([0.5]), 0.0), + (Vectors.dense([0.6]), 1.0), + (Vectors.dense([1.0]), 1.0)] * 10, ["features", "label"]) lr = LogisticRegression() grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() @@ -857,8 +853,8 @@ class LDATest(SparkSessionTestCase): def test_persistence(self): # Test save/load for LDA, LocalLDAModel, DistributedLDAModel. df = self.spark.createDataFrame([ - [1, OldVectors.dense([0.0, 1.0])], - [2, OldVectors.sparse(2, {0: 1.0})], + [1, Vectors.dense([0.0, 1.0])], + [2, Vectors.sparse(2, {0: 1.0})], ], ["id", "features"]) # Fit model lda = LDA(k=2, seed=1, optimizer="em") @@ -893,8 +889,8 @@ class LDATest(SparkSessionTestCase): class TrainingSummaryTest(SparkSessionTestCase): def test_linear_regression_summary(self): - df = self.spark.createDataFrame([(1.0, 2.0, OldVectors.dense(1.0)), - (0.0, 2.0, OldVectors.sparse(1, [], []))], + df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)), + (0.0, 2.0, Vectors.sparse(1, [], []))], ["label", "weight", "features"]) lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", weightCol="weight", fitIntercept=False) @@ -930,7 +926,7 @@ class TrainingSummaryTest(SparkSessionTestCase): self.assertAlmostEqual(sameSummary.explainedVariance, s.explainedVariance) def test_glr_summary(self): - from pyspark.mllib.linalg import Vectors + from pyspark.ml.linalg import Vectors df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)), (0.0, 2.0, Vectors.sparse(1, [], []))], ["label", "weight", "features"]) @@ -966,8 +962,8 @@ class TrainingSummaryTest(SparkSessionTestCase): self.assertAlmostEqual(sameSummary.deviance, s.deviance) def test_logistic_regression_summary(self): - df = self.spark.createDataFrame([(1.0, 2.0, OldVectors.dense(1.0)), - (0.0, 2.0, OldVectors.sparse(1, [], []))], + df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)), + (0.0, 2.0, Vectors.sparse(1, [], []))], ["label", "weight", "features"]) lr = LogisticRegression(maxIter=5, regParam=0.01, weightCol="weight", fitIntercept=False) model = lr.fit(df) @@ -996,9 +992,9 @@ class TrainingSummaryTest(SparkSessionTestCase): class OneVsRestTests(SparkSessionTestCase): def test_copy(self): - df = self.spark.createDataFrame([(0.0, OldVectors.dense(1.0, 0.8)), - (1.0, OldVectors.sparse(2, [], [])), - (2.0, OldVectors.dense(0.5, 0.5))], + df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), + (1.0, Vectors.sparse(2, [], [])), + (2.0, Vectors.dense(0.5, 0.5))], ["label", "features"]) lr = LogisticRegression(maxIter=5, regParam=0.01) ovr = OneVsRest(classifier=lr) @@ -1010,9 +1006,9 @@ class OneVsRestTests(SparkSessionTestCase): self.assertEqual(model1.getPredictionCol(), "indexed") def test_output_columns(self): - df = self.spark.createDataFrame([(0.0, OldVectors.dense(1.0, 0.8)), - (1.0, OldVectors.sparse(2, [], [])), - (2.0, OldVectors.dense(0.5, 0.5))], + df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), + (1.0, Vectors.sparse(2, [], [])), + (2.0, Vectors.dense(0.5, 0.5))], ["label", "features"]) lr = LogisticRegression(maxIter=5, regParam=0.01) ovr = OneVsRest(classifier=lr) @@ -1022,9 +1018,9 @@ class OneVsRestTests(SparkSessionTestCase): def test_save_load(self): temp_path = tempfile.mkdtemp() - df = self.spark.createDataFrame([(0.0, OldVectors.dense(1.0, 0.8)), - (1.0, OldVectors.sparse(2, [], [])), - (2.0, OldVectors.dense(0.5, 0.5))], + df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), + (1.0, Vectors.sparse(2, [], [])), + (2.0, Vectors.dense(0.5, 0.5))], ["label", "features"]) lr = LogisticRegression(maxIter=5, regParam=0.01) ovr = OneVsRest(classifier=lr) @@ -1052,7 +1048,7 @@ class HashingTFTest(SparkSessionTestCase): hashingTF.setInputCol("words").setOutputCol("features").setNumFeatures(n).setBinary(True) output = hashingTF.transform(df) features = output.select("features").first().features.toArray() - expected = OldVectors.dense([1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]).toArray() + expected = Vectors.dense([1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]).toArray() for i in range(0, n): self.assertAlmostEqual(features[i], expected[i], 14, "Error at " + str(i) + ": expected " + str(expected[i]) + ", got " + str(features[i])) @@ -1147,15 +1143,13 @@ class VectorTests(MLlibTestCase): self.assertEqual(vs, nvs) def test_serialize(self): - # Because pickle path still uses old vector/matrix - # TODO: Change this to new vector/matrix when pickle for new vector/matrix is ready. - self._test_serialize(OldDenseVector(range(10))) - self._test_serialize(OldDenseVector(array([1., 2., 3., 4.]))) - self._test_serialize(OldDenseVector(pyarray.array('d', range(10)))) - self._test_serialize(OldSparseVector(4, {1: 1, 3: 2})) - self._test_serialize(OldSparseVector(3, {})) - self._test_serialize(OldDenseMatrix(2, 3, range(6))) - sm1 = OldSparseMatrix( + self._test_serialize(DenseVector(range(10))) + self._test_serialize(DenseVector(array([1., 2., 3., 4.]))) + self._test_serialize(DenseVector(pyarray.array('d', range(10)))) + self._test_serialize(SparseVector(4, {1: 1, 3: 2})) + self._test_serialize(SparseVector(3, {})) + self._test_serialize(DenseMatrix(2, 3, range(6))) + sm1 = SparseMatrix( 3, 4, [0, 2, 2, 4, 4], [1, 2, 1, 2], [1.0, 2.0, 4.0, 5.0]) self._test_serialize(sm1) @@ -1407,12 +1401,6 @@ class VectorUDTTests(MLlibTestCase): sv1 = SparseVector(2, [1], [2.0]) udt = VectorUDT() - old_dv0 = OldDenseVector([]) - old_dv1 = OldDenseVector([1.0, 2.0]) - old_sv0 = OldSparseVector(2, [], []) - old_sv1 = OldSparseVector(2, [1], [2.0]) - old_udt = OldVectorUDT() - def test_json_schema(self): self.assertEqual(VectorUDT.fromJson(self.udt.jsonValue()), self.udt) @@ -1421,19 +1409,19 @@ class VectorUDTTests(MLlibTestCase): self.assertEqual(v, self.udt.deserialize(self.udt.serialize(v))) def test_infer_schema(self): - rdd = self.sc.parallelize([LabeledPoint(1.0, self.old_dv1), - LabeledPoint(0.0, self.old_sv1)]) + rdd = self.sc.parallelize([Row(label=1.0, features=self.dv1), + Row(label=0.0, features=self.sv1)]) df = rdd.toDF() schema = df.schema field = [f for f in schema.fields if f.name == "features"][0] - self.assertEqual(field.dataType, self.old_udt) + self.assertEqual(field.dataType, self.udt) vectors = df.rdd.map(lambda p: p.features).collect() self.assertEqual(len(vectors), 2) for v in vectors: - if isinstance(v, OldSparseVector): - self.assertEqual(v, self.old_sv1) - elif isinstance(v, OldDenseVector): - self.assertEqual(v, self.old_dv1) + if isinstance(v, SparseVector): + self.assertEqual(v, self.sv1) + elif isinstance(v, DenseVector): + self.assertEqual(v, self.dv1) else: raise TypeError("expecting a vector but got %r of type %r" % (v, type(v))) @@ -1446,12 +1434,6 @@ class MatrixUDTTests(MLlibTestCase): sm2 = SparseMatrix(2, 1, [0, 0, 1], [0], [5.0], isTransposed=True) udt = MatrixUDT() - old_dm1 = OldDenseMatrix(3, 2, [0, 1, 4, 5, 9, 10]) - old_dm2 = OldDenseMatrix(3, 2, [0, 1, 4, 5, 9, 10], isTransposed=True) - old_sm1 = OldSparseMatrix(1, 1, [0, 1], [0], [2.0]) - old_sm2 = OldSparseMatrix(2, 1, [0, 0, 1], [0], [5.0], isTransposed=True) - old_udt = OldMatrixUDT() - def test_json_schema(self): self.assertEqual(MatrixUDT.fromJson(self.udt.jsonValue()), self.udt) @@ -1460,17 +1442,17 @@ class MatrixUDTTests(MLlibTestCase): self.assertEqual(m, self.udt.deserialize(self.udt.serialize(m))) def test_infer_schema(self): - rdd = self.sc.parallelize([("dense", self.old_dm1), ("sparse", self.old_sm1)]) + rdd = self.sc.parallelize([("dense", self.dm1), ("sparse", self.sm1)]) df = rdd.toDF() schema = df.schema - self.assertTrue(schema.fields[1].dataType, self.old_udt) + self.assertTrue(schema.fields[1].dataType, self.udt) matrices = df.rdd.map(lambda x: x._2).collect() self.assertEqual(len(matrices), 2) for m in matrices: - if isinstance(m, OldDenseMatrix): - self.assertTrue(m, self.old_dm1) - elif isinstance(m, OldSparseMatrix): - self.assertTrue(m, self.old_sm1) + if isinstance(m, DenseMatrix): + self.assertTrue(m, self.dm1) + elif isinstance(m, SparseMatrix): + self.assertTrue(m, self.sm1) else: raise ValueError("Expected a matrix but got type %r" % type(m)) |