aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/test/scala/org/apache
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-02-25 23:07:59 +0800
committerCheng Lian <lian@databricks.com>2016-02-25 23:07:59 +0800
commit157fe64f3ecbd13b7286560286e50235eecfe30e (patch)
tree4d8baffc6dc47e6c4e241e003314c2c68b8f9a98 /mllib/src/test/scala/org/apache
parent4460113d419b5da47ba3c956b8430fd00eb03217 (diff)
downloadspark-157fe64f3ecbd13b7286560286e50235eecfe30e.tar.gz
spark-157fe64f3ecbd13b7286560286e50235eecfe30e.tar.bz2
spark-157fe64f3ecbd13b7286560286e50235eecfe30e.zip
[SPARK-13457][SQL] Removes DataFrame RDD operations
## What changes were proposed in this pull request? This PR removes DataFrame RDD operations. Original calls are now replaced by calls to methods of `DataFrame.rdd`. ## How was the this patch tested? No extra tests are added. Existing tests should do the work. Author: Cheng Lian <lian@databricks.com> Closes #11323 from liancheng/remove-df-rdd-ops.
Diffstat (limited to 'mllib/src/test/scala/org/apache')
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala2
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala5
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala6
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala3
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala3
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala2
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala4
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala6
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala5
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala8
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala7
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala2
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala6
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala17
14 files changed, 40 insertions, 36 deletions
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
index 972c0868a4..cfb9bbfd41 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
@@ -735,7 +735,7 @@ class LogisticRegressionSuite
val model1 = trainer1.fit(binaryDataset)
val model2 = trainer2.fit(binaryDataset)
- val histogram = binaryDataset.map { case Row(label: Double, features: Vector) => label }
+ val histogram = binaryDataset.rdd.map { case Row(label: Double, features: Vector) => label }
.treeAggregate(new MultiClassSummarizer)(
seqOp = (c, v) => (c, v) match {
case (classSummarizer: MultiClassSummarizer, label: Double) => classSummarizer.add(label)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala
index a326432d01..602b5a8116 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala
@@ -76,8 +76,9 @@ class MultilayerPerceptronClassifierSuite extends SparkFunSuite with MLlibTestSp
val model = trainer.fit(dataFrame)
val numFeatures = dataFrame.select("features").first().getAs[Vector](0).size
assert(model.numFeatures === numFeatures)
- val mlpPredictionAndLabels = model.transform(dataFrame).select("prediction", "label")
- .map { case Row(p: Double, l: Double) => (p, l) }
+ val mlpPredictionAndLabels = model.transform(dataFrame).select("prediction", "label").rdd.map {
+ case Row(p: Double, l: Double) => (p, l)
+ }
// train multinomial logistic regression
val lr = new LogisticRegressionWithLBFGS()
.setIntercept(true)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
index 445e50d867..2ae74a2090 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
@@ -81,9 +81,9 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext {
val predictionColSchema = transformedDataset.schema(ovaModel.getPredictionCol)
assert(MetadataUtils.getNumClasses(predictionColSchema) === Some(3))
- val ovaResults = transformedDataset
- .select("prediction", "label")
- .map(row => (row.getDouble(0), row.getDouble(1)))
+ val ovaResults = transformedDataset.select("prediction", "label").rdd.map {
+ row => (row.getDouble(0), row.getDouble(1))
+ }
val lr = new LogisticRegressionWithLBFGS().setIntercept(true).setNumClasses(numClasses)
lr.optimizer.setRegParam(0.1).setNumIterations(100)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
index fc4a4add5d..b719a8c7e7 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
@@ -77,7 +77,8 @@ class BisectingKMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
expectedColumns.foreach { column =>
assert(transformed.columns.contains(column))
}
- val clusters = transformed.select(predictionColName).map(_.getInt(0)).distinct().collect().toSet
+ val clusters =
+ transformed.select(predictionColName).rdd.map(_.getInt(0)).distinct().collect().toSet
assert(clusters.size === k)
assert(clusters === Set(0, 1, 2, 3, 4))
assert(model.computeCost(dataset) < 0.1)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
index e5357ba8e2..c684bc11cc 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
@@ -93,7 +93,8 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultR
expectedColumns.foreach { column =>
assert(transformed.columns.contains(column))
}
- val clusters = transformed.select(predictionColName).map(_.getInt(0)).distinct().collect().toSet
+ val clusters =
+ transformed.select(predictionColName).rdd.map(_.getInt(0)).distinct().collect().toSet
assert(clusters.size === k)
assert(clusters === Set(0, 1, 2, 3, 4))
assert(model.computeCost(dataset) < 0.1)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
index 97dbfd9a43..03270401ad 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
@@ -199,7 +199,7 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
// describeTopics
val topics = model.describeTopics(3)
assert(topics.count() === k)
- assert(topics.select("topic").map(_.getInt(0)).collect().toSet === Range(0, k).toSet)
+ assert(topics.select("topic").rdd.map(_.getInt(0)).collect().toSet === Range(0, k).toSet)
topics.select("termIndices").collect().foreach { case r: Row =>
val termIndices = r.getAs[Seq[Int]](0)
assert(termIndices.length === 3 && termIndices.toSet.size === 3)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala
index 76d12050f9..e238b33ed8 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala
@@ -51,7 +51,7 @@ class OneHotEncoderSuite
.setDropLast(false)
val encoded = encoder.transform(transformed)
- val output = encoded.select("id", "labelVec").map { r =>
+ val output = encoded.select("id", "labelVec").rdd.map { r =>
val vec = r.getAs[Vector](1)
(r.getInt(0), vec(0), vec(1), vec(2))
}.collect().toSet
@@ -68,7 +68,7 @@ class OneHotEncoderSuite
.setOutputCol("labelVec")
val encoded = encoder.transform(transformed)
- val output = encoded.select("id", "labelVec").map { r =>
+ val output = encoded.select("id", "labelVec").rdd.map { r =>
val vec = r.getAs[Vector](1)
(r.getInt(0), vec(0), vec(1))
}.collect().toSet
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
index 5d199ca9b5..dea61648f2 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
@@ -52,7 +52,7 @@ class StringIndexerSuite
val attr = Attribute.fromStructField(transformed.schema("labelIndex"))
.asInstanceOf[NominalAttribute]
assert(attr.values.get === Array("a", "c", "b"))
- val output = transformed.select("id", "labelIndex").map { r =>
+ val output = transformed.select("id", "labelIndex").rdd.map { r =>
(r.getInt(0), r.getDouble(1))
}.collect().toSet
// a -> 0, b -> 2, c -> 1
@@ -83,7 +83,7 @@ class StringIndexerSuite
val attr = Attribute.fromStructField(transformed.schema("labelIndex"))
.asInstanceOf[NominalAttribute]
assert(attr.values.get === Array("b", "a"))
- val output = transformed.select("id", "labelIndex").map { r =>
+ val output = transformed.select("id", "labelIndex").rdd.map { r =>
(r.getInt(0), r.getDouble(1))
}.collect().toSet
// a -> 1, b -> 0
@@ -102,7 +102,7 @@ class StringIndexerSuite
val attr = Attribute.fromStructField(transformed.schema("labelIndex"))
.asInstanceOf[NominalAttribute]
assert(attr.values.get === Array("100", "300", "200"))
- val output = transformed.select("id", "labelIndex").map { r =>
+ val output = transformed.select("id", "labelIndex").rdd.map { r =>
(r.getInt(0), r.getDouble(1))
}.collect().toSet
// 100 -> 0, 200 -> 2, 300 -> 1
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
index 67817fa4ba..d4f836ef33 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
@@ -159,7 +159,7 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext
// Chose correct categorical features
assert(categoryMaps.keys.toSet === categoricalFeatures)
val transformed = model.transform(data).select("indexed")
- val indexedRDD: RDD[Vector] = transformed.map(_.getAs[Vector](0))
+ val indexedRDD: RDD[Vector] = transformed.rdd.map(_.getAs[Vector](0))
val featureAttrs = AttributeGroup.fromStructField(transformed.schema("indexed"))
assert(featureAttrs.name === "indexed")
assert(featureAttrs.attributes.get.length === model.numFeatures)
@@ -216,7 +216,8 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext
val points = data.collect().map(_.getAs[Vector](0))
val vectorIndexer = getIndexer.setMaxCategories(maxCategories)
val model = vectorIndexer.fit(data)
- val indexedPoints = model.transform(data).select("indexed").map(_.getAs[Vector](0)).collect()
+ val indexedPoints =
+ model.transform(data).select("indexed").rdd.map(_.getAs[Vector](0)).collect()
points.zip(indexedPoints).foreach {
case (orig: SparseVector, indexed: SparseVector) =>
assert(orig.indices.length == indexed.indices.length)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala
index f094c550e5..1671fb6f3a 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala
@@ -100,7 +100,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
.setSeed(42L)
.fit(docDF)
- val realVectors = model.getVectors.sort("word").select("vector").map {
+ val realVectors = model.getVectors.sort("word").select("vector").rdd.map {
case Row(v: Vector) => v
}.collect()
// These expectations are just magic values, characterizing the current
@@ -134,7 +134,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
.fit(docDF)
val expectedSimilarity = Array(0.2608488929093532, -0.8271274846926078)
- val (synonyms, similarity) = model.findSynonyms("a", 2).map {
+ val (synonyms, similarity) = model.findSynonyms("a", 2).rdd.map {
case Row(w: String, sim: Double) => (w, sim)
}.collect().unzip
@@ -161,7 +161,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
.setSeed(42L)
.fit(docDF)
- val (synonyms, similarity) = model.findSynonyms("a", 6).map {
+ val (synonyms, similarity) = model.findSynonyms("a", 6).rdd.map {
case Row(w: String, sim: Double) => (w, sim)
}.collect().unzip
@@ -174,7 +174,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
.setWindowSize(10)
.fit(docDF)
- val (synonymsLarger, similarityLarger) = model.findSynonyms("a", 6).map {
+ val (synonymsLarger, similarityLarger) = model.findSynonyms("a", 6).rdd.map {
case Row(w: String, sim: Double) => (w, sim)
}.collect().unzip
// The similarity score should be very different with the larger window
diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index ff0d8f5568..2bedd70ce9 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -342,11 +342,10 @@ class ALSSuite
.setSeed(0)
val alpha = als.getAlpha
val model = als.fit(training.toDF())
- val predictions = model.transform(test.toDF())
- .select("rating", "prediction")
- .map { case Row(rating: Float, prediction: Float) =>
+ val predictions = model.transform(test.toDF()).select("rating", "prediction").rdd.map {
+ case Row(rating: Float, prediction: Float) =>
(rating.toDouble, prediction.toDouble)
- }
+ }
val rmse =
if (implicitPrefs) {
// TODO: Use a better (rank-based?) evaluation metric for implicit feedback.
diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala
index 09326600e6..244db8637b 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala
@@ -87,7 +87,7 @@ class GBTRegressorSuite extends SparkFunSuite with MLlibTestSparkContext {
// copied model must have the same parent.
MLTestingUtils.checkCopy(model)
val preds = model.transform(df)
- val predictions = preds.select("prediction").map(_.getDouble(0))
+ val predictions = preds.select("prediction").rdd.map(_.getDouble(0))
// Checks based on SPARK-8736 (to ensure it is not doing classification)
assert(predictions.max() > 2)
assert(predictions.min() < -1)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
index f067c29d27..b8874b4cd3 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
@@ -46,7 +46,7 @@ class IsotonicRegressionSuite
val predictions = model
.transform(dataset)
- .select("prediction").map { case Row(pred) =>
+ .select("prediction").rdd.map { case Row(pred) =>
pred
}.collect()
@@ -66,7 +66,7 @@ class IsotonicRegressionSuite
val predictions = model
.transform(features)
- .select("prediction").map {
+ .select("prediction").rdd.map {
case Row(pred) => pred
}.collect()
@@ -160,7 +160,7 @@ class IsotonicRegressionSuite
val predictions = model
.transform(features)
- .select("prediction").map {
+ .select("prediction").rdd.map {
case Row(pred) => pred
}.collect()
diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
index 3ae108d822..9dee04c877 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
@@ -686,17 +686,18 @@ class LinearRegressionSuite
// Residuals in [[LinearRegressionResults]] should equal those manually computed
val expectedResiduals = datasetWithDenseFeature.select("features", "label")
+ .rdd
.map { case Row(features: DenseVector, label: Double) =>
- val prediction =
- features(0) * model.coefficients(0) + features(1) * model.coefficients(1) +
- model.intercept
- label - prediction
- }
- .zip(model.summary.residuals.map(_.getDouble(0)))
+ val prediction =
+ features(0) * model.coefficients(0) + features(1) * model.coefficients(1) +
+ model.intercept
+ label - prediction
+ }
+ .zip(model.summary.residuals.rdd.map(_.getDouble(0)))
.collect()
.foreach { case (manualResidual: Double, resultResidual: Double) =>
- assert(manualResidual ~== resultResidual relTol 1E-5)
- }
+ assert(manualResidual ~== resultResidual relTol 1E-5)
+ }
/*
# Use the following R code to generate model training results.