aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/ml/tests.py
diff options
context:
space:
mode:
authorDB Tsai <dbt@netflix.com>2016-05-17 12:51:07 -0700
committerXiangrui Meng <meng@databricks.com>2016-05-17 12:51:07 -0700
commite2efe0529acd748f26dbaa41331d1733ed256237 (patch)
treefe1a5aeeadfbf220b5dbe1429e0235153db8117b /python/pyspark/ml/tests.py
parent9f176dd3918129a72282a6b7a12e2899cbb6dac9 (diff)
downloadspark-e2efe0529acd748f26dbaa41331d1733ed256237.tar.gz
spark-e2efe0529acd748f26dbaa41331d1733ed256237.tar.bz2
spark-e2efe0529acd748f26dbaa41331d1733ed256237.zip
[SPARK-14615][ML] Use the new ML Vector and Matrix in the ML pipeline based algorithms
## What changes were proposed in this pull request? Once SPARK-14487 and SPARK-14549 are merged, we will migrate to use the new vector and matrix type in the new ml pipeline based apis. ## How was this patch tested? Unit tests Author: DB Tsai <dbt@netflix.com> Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Author: Xiangrui Meng <meng@databricks.com> Closes #12627 from dbtsai/SPARK-14615-NewML.
Diffstat (limited to 'python/pyspark/ml/tests.py')
-rwxr-xr-xpython/pyspark/ml/tests.py136
1 files changed, 59 insertions, 77 deletions
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index c567905759..e3511120bd 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -62,10 +62,6 @@ from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, \
from pyspark.ml.tuning import *
from pyspark.ml.wrapper import JavaParams
from pyspark.mllib.common import _java2py
-from pyspark.mllib.linalg import SparseVector as OldSparseVector, DenseVector as OldDenseVector,\
- DenseMatrix as OldDenseMatrix, MatrixUDT as OldMatrixUDT, SparseMatrix as OldSparseMatrix,\
- Vectors as OldVectors, VectorUDT as OldVectorUDT
-from pyspark.mllib.regression import LabeledPoint
from pyspark.serializers import PickleSerializer
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.functions import rand
@@ -162,22 +158,22 @@ class ParamTypeConversionTests(PySparkTestCase):
def test_vector(self):
ewp = ElementwiseProduct(scalingVec=[1, 3])
- self.assertEqual(ewp.getScalingVec(), OldDenseVector([1.0, 3.0]))
+ self.assertEqual(ewp.getScalingVec(), DenseVector([1.0, 3.0]))
ewp = ElementwiseProduct(scalingVec=np.array([1.2, 3.4]))
- self.assertEqual(ewp.getScalingVec(), OldDenseVector([1.2, 3.4]))
+ self.assertEqual(ewp.getScalingVec(), DenseVector([1.2, 3.4]))
self.assertRaises(TypeError, lambda: ElementwiseProduct(scalingVec=["a", "b"]))
def test_list(self):
l = [0, 1]
- for lst_like in [l, np.array(l), OldDenseVector(l), OldSparseVector(len(l),
+ for lst_like in [l, np.array(l), DenseVector(l), SparseVector(len(l),
range(len(l)), l), pyarray.array('l', l), xrange(2), tuple(l)]:
converted = TypeConverters.toList(lst_like)
self.assertEqual(type(converted), list)
self.assertListEqual(converted, l)
def test_list_int(self):
- for indices in [[1.0, 2.0], np.array([1.0, 2.0]), OldDenseVector([1.0, 2.0]),
- OldSparseVector(2, {0: 1.0, 1: 2.0}), xrange(1, 3), (1.0, 2.0),
+ for indices in [[1.0, 2.0], np.array([1.0, 2.0]), DenseVector([1.0, 2.0]),
+ SparseVector(2, {0: 1.0, 1: 2.0}), xrange(1, 3), (1.0, 2.0),
pyarray.array('d', [1.0, 2.0])]:
vs = VectorSlicer(indices=indices)
self.assertListEqual(vs.getIndices(), [1, 2])
@@ -410,9 +406,9 @@ class FeatureTests(SparkSessionTestCase):
def test_idf(self):
dataset = self.spark.createDataFrame([
- (OldDenseVector([1.0, 2.0]),),
- (OldDenseVector([0.0, 1.0]),),
- (OldDenseVector([3.0, 0.2]),)], ["tf"])
+ (DenseVector([1.0, 2.0]),),
+ (DenseVector([0.0, 1.0]),),
+ (DenseVector([3.0, 0.2]),)], ["tf"])
idf0 = IDF(inputCol="tf")
self.assertListEqual(idf0.params, [idf0.inputCol, idf0.minDocFreq, idf0.outputCol])
idf0m = idf0.fit(dataset, {idf0.outputCol: "idf"})
@@ -457,10 +453,10 @@ class FeatureTests(SparkSessionTestCase):
def test_count_vectorizer_with_binary(self):
dataset = self.spark.createDataFrame([
- (0, "a a a b b c".split(' '), OldSparseVector(3, {0: 1.0, 1: 1.0, 2: 1.0}),),
- (1, "a a".split(' '), OldSparseVector(3, {0: 1.0}),),
- (2, "a b".split(' '), OldSparseVector(3, {0: 1.0, 1: 1.0}),),
- (3, "c".split(' '), OldSparseVector(3, {2: 1.0}),)], ["id", "words", "expected"])
+ (0, "a a a b b c".split(' '), SparseVector(3, {0: 1.0, 1: 1.0, 2: 1.0}),),
+ (1, "a a".split(' '), SparseVector(3, {0: 1.0}),),
+ (2, "a b".split(' '), SparseVector(3, {0: 1.0, 1: 1.0}),),
+ (3, "c".split(' '), SparseVector(3, {2: 1.0}),)], ["id", "words", "expected"])
cv = CountVectorizer(binary=True, inputCol="words", outputCol="features")
model = cv.fit(dataset)
@@ -581,11 +577,11 @@ class CrossValidatorTests(SparkSessionTestCase):
# Save/load for CrossValidator will be added later: SPARK-13786
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
- [(OldVectors.dense([0.0]), 0.0),
- (OldVectors.dense([0.4]), 1.0),
- (OldVectors.dense([0.5]), 0.0),
- (OldVectors.dense([0.6]), 1.0),
- (OldVectors.dense([1.0]), 1.0)] * 10,
+ [(Vectors.dense([0.0]), 0.0),
+ (Vectors.dense([0.4]), 1.0),
+ (Vectors.dense([0.5]), 0.0),
+ (Vectors.dense([0.6]), 1.0),
+ (Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
@@ -654,11 +650,11 @@ class TrainValidationSplitTests(SparkSessionTestCase):
# Save/load for TrainValidationSplit will be added later: SPARK-13786
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
- [(OldVectors.dense([0.0]), 0.0),
- (OldVectors.dense([0.4]), 1.0),
- (OldVectors.dense([0.5]), 0.0),
- (OldVectors.dense([0.6]), 1.0),
- (OldVectors.dense([1.0]), 1.0)] * 10,
+ [(Vectors.dense([0.0]), 0.0),
+ (Vectors.dense([0.4]), 1.0),
+ (Vectors.dense([0.5]), 0.0),
+ (Vectors.dense([0.6]), 1.0),
+ (Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
@@ -857,8 +853,8 @@ class LDATest(SparkSessionTestCase):
def test_persistence(self):
# Test save/load for LDA, LocalLDAModel, DistributedLDAModel.
df = self.spark.createDataFrame([
- [1, OldVectors.dense([0.0, 1.0])],
- [2, OldVectors.sparse(2, {0: 1.0})],
+ [1, Vectors.dense([0.0, 1.0])],
+ [2, Vectors.sparse(2, {0: 1.0})],
], ["id", "features"])
# Fit model
lda = LDA(k=2, seed=1, optimizer="em")
@@ -893,8 +889,8 @@ class LDATest(SparkSessionTestCase):
class TrainingSummaryTest(SparkSessionTestCase):
def test_linear_regression_summary(self):
- df = self.spark.createDataFrame([(1.0, 2.0, OldVectors.dense(1.0)),
- (0.0, 2.0, OldVectors.sparse(1, [], []))],
+ df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),
+ (0.0, 2.0, Vectors.sparse(1, [], []))],
["label", "weight", "features"])
lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", weightCol="weight",
fitIntercept=False)
@@ -930,7 +926,7 @@ class TrainingSummaryTest(SparkSessionTestCase):
self.assertAlmostEqual(sameSummary.explainedVariance, s.explainedVariance)
def test_glr_summary(self):
- from pyspark.mllib.linalg import Vectors
+ from pyspark.ml.linalg import Vectors
df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),
(0.0, 2.0, Vectors.sparse(1, [], []))],
["label", "weight", "features"])
@@ -966,8 +962,8 @@ class TrainingSummaryTest(SparkSessionTestCase):
self.assertAlmostEqual(sameSummary.deviance, s.deviance)
def test_logistic_regression_summary(self):
- df = self.spark.createDataFrame([(1.0, 2.0, OldVectors.dense(1.0)),
- (0.0, 2.0, OldVectors.sparse(1, [], []))],
+ df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),
+ (0.0, 2.0, Vectors.sparse(1, [], []))],
["label", "weight", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01, weightCol="weight", fitIntercept=False)
model = lr.fit(df)
@@ -996,9 +992,9 @@ class TrainingSummaryTest(SparkSessionTestCase):
class OneVsRestTests(SparkSessionTestCase):
def test_copy(self):
- df = self.spark.createDataFrame([(0.0, OldVectors.dense(1.0, 0.8)),
- (1.0, OldVectors.sparse(2, [], [])),
- (2.0, OldVectors.dense(0.5, 0.5))],
+ df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
+ (1.0, Vectors.sparse(2, [], [])),
+ (2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01)
ovr = OneVsRest(classifier=lr)
@@ -1010,9 +1006,9 @@ class OneVsRestTests(SparkSessionTestCase):
self.assertEqual(model1.getPredictionCol(), "indexed")
def test_output_columns(self):
- df = self.spark.createDataFrame([(0.0, OldVectors.dense(1.0, 0.8)),
- (1.0, OldVectors.sparse(2, [], [])),
- (2.0, OldVectors.dense(0.5, 0.5))],
+ df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
+ (1.0, Vectors.sparse(2, [], [])),
+ (2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01)
ovr = OneVsRest(classifier=lr)
@@ -1022,9 +1018,9 @@ class OneVsRestTests(SparkSessionTestCase):
def test_save_load(self):
temp_path = tempfile.mkdtemp()
- df = self.spark.createDataFrame([(0.0, OldVectors.dense(1.0, 0.8)),
- (1.0, OldVectors.sparse(2, [], [])),
- (2.0, OldVectors.dense(0.5, 0.5))],
+ df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
+ (1.0, Vectors.sparse(2, [], [])),
+ (2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01)
ovr = OneVsRest(classifier=lr)
@@ -1052,7 +1048,7 @@ class HashingTFTest(SparkSessionTestCase):
hashingTF.setInputCol("words").setOutputCol("features").setNumFeatures(n).setBinary(True)
output = hashingTF.transform(df)
features = output.select("features").first().features.toArray()
- expected = OldVectors.dense([1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]).toArray()
+ expected = Vectors.dense([1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]).toArray()
for i in range(0, n):
self.assertAlmostEqual(features[i], expected[i], 14, "Error at " + str(i) +
": expected " + str(expected[i]) + ", got " + str(features[i]))
@@ -1147,15 +1143,13 @@ class VectorTests(MLlibTestCase):
self.assertEqual(vs, nvs)
def test_serialize(self):
- # Because pickle path still uses old vector/matrix
- # TODO: Change this to new vector/matrix when pickle for new vector/matrix is ready.
- self._test_serialize(OldDenseVector(range(10)))
- self._test_serialize(OldDenseVector(array([1., 2., 3., 4.])))
- self._test_serialize(OldDenseVector(pyarray.array('d', range(10))))
- self._test_serialize(OldSparseVector(4, {1: 1, 3: 2}))
- self._test_serialize(OldSparseVector(3, {}))
- self._test_serialize(OldDenseMatrix(2, 3, range(6)))
- sm1 = OldSparseMatrix(
+ self._test_serialize(DenseVector(range(10)))
+ self._test_serialize(DenseVector(array([1., 2., 3., 4.])))
+ self._test_serialize(DenseVector(pyarray.array('d', range(10))))
+ self._test_serialize(SparseVector(4, {1: 1, 3: 2}))
+ self._test_serialize(SparseVector(3, {}))
+ self._test_serialize(DenseMatrix(2, 3, range(6)))
+ sm1 = SparseMatrix(
3, 4, [0, 2, 2, 4, 4], [1, 2, 1, 2], [1.0, 2.0, 4.0, 5.0])
self._test_serialize(sm1)
@@ -1407,12 +1401,6 @@ class VectorUDTTests(MLlibTestCase):
sv1 = SparseVector(2, [1], [2.0])
udt = VectorUDT()
- old_dv0 = OldDenseVector([])
- old_dv1 = OldDenseVector([1.0, 2.0])
- old_sv0 = OldSparseVector(2, [], [])
- old_sv1 = OldSparseVector(2, [1], [2.0])
- old_udt = OldVectorUDT()
-
def test_json_schema(self):
self.assertEqual(VectorUDT.fromJson(self.udt.jsonValue()), self.udt)
@@ -1421,19 +1409,19 @@ class VectorUDTTests(MLlibTestCase):
self.assertEqual(v, self.udt.deserialize(self.udt.serialize(v)))
def test_infer_schema(self):
- rdd = self.sc.parallelize([LabeledPoint(1.0, self.old_dv1),
- LabeledPoint(0.0, self.old_sv1)])
+ rdd = self.sc.parallelize([Row(label=1.0, features=self.dv1),
+ Row(label=0.0, features=self.sv1)])
df = rdd.toDF()
schema = df.schema
field = [f for f in schema.fields if f.name == "features"][0]
- self.assertEqual(field.dataType, self.old_udt)
+ self.assertEqual(field.dataType, self.udt)
vectors = df.rdd.map(lambda p: p.features).collect()
self.assertEqual(len(vectors), 2)
for v in vectors:
- if isinstance(v, OldSparseVector):
- self.assertEqual(v, self.old_sv1)
- elif isinstance(v, OldDenseVector):
- self.assertEqual(v, self.old_dv1)
+ if isinstance(v, SparseVector):
+ self.assertEqual(v, self.sv1)
+ elif isinstance(v, DenseVector):
+ self.assertEqual(v, self.dv1)
else:
raise TypeError("expecting a vector but got %r of type %r" % (v, type(v)))
@@ -1446,12 +1434,6 @@ class MatrixUDTTests(MLlibTestCase):
sm2 = SparseMatrix(2, 1, [0, 0, 1], [0], [5.0], isTransposed=True)
udt = MatrixUDT()
- old_dm1 = OldDenseMatrix(3, 2, [0, 1, 4, 5, 9, 10])
- old_dm2 = OldDenseMatrix(3, 2, [0, 1, 4, 5, 9, 10], isTransposed=True)
- old_sm1 = OldSparseMatrix(1, 1, [0, 1], [0], [2.0])
- old_sm2 = OldSparseMatrix(2, 1, [0, 0, 1], [0], [5.0], isTransposed=True)
- old_udt = OldMatrixUDT()
-
def test_json_schema(self):
self.assertEqual(MatrixUDT.fromJson(self.udt.jsonValue()), self.udt)
@@ -1460,17 +1442,17 @@ class MatrixUDTTests(MLlibTestCase):
self.assertEqual(m, self.udt.deserialize(self.udt.serialize(m)))
def test_infer_schema(self):
- rdd = self.sc.parallelize([("dense", self.old_dm1), ("sparse", self.old_sm1)])
+ rdd = self.sc.parallelize([("dense", self.dm1), ("sparse", self.sm1)])
df = rdd.toDF()
schema = df.schema
- self.assertTrue(schema.fields[1].dataType, self.old_udt)
+ self.assertTrue(schema.fields[1].dataType, self.udt)
matrices = df.rdd.map(lambda x: x._2).collect()
self.assertEqual(len(matrices), 2)
for m in matrices:
- if isinstance(m, OldDenseMatrix):
- self.assertTrue(m, self.old_dm1)
- elif isinstance(m, OldSparseMatrix):
- self.assertTrue(m, self.old_sm1)
+ if isinstance(m, DenseMatrix):
+ self.assertTrue(m, self.dm1)
+ elif isinstance(m, SparseMatrix):
+ self.assertTrue(m, self.sm1)
else:
raise ValueError("Expected a matrix but got type %r" % type(m))