aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2016-02-25 11:53:48 -0800
committerDavies Liu <davies.liu@gmail.com>2016-02-25 11:53:48 -0800
commit751724b1320d38fd94186df3d8f1ca887f21947a (patch)
treeef365e952284a7ec26aee882caa429a729223c9d
parent46f6e79316b72afea0c9b1559ea662dd3e95e57b (diff)
downloadspark-751724b1320d38fd94186df3d8f1ca887f21947a.tar.gz
spark-751724b1320d38fd94186df3d8f1ca887f21947a.tar.bz2
spark-751724b1320d38fd94186df3d8f1ca887f21947a.zip
Revert "[SPARK-13457][SQL] Removes DataFrame RDD operations"
This reverts commit 157fe64f3ecbd13b7286560286e50235eecfe30e.
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala8
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/Predictor.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala13
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala1
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala9
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala3
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala1
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala1
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala16
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala8
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala12
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala2
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala2
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala5
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala6
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala3
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala3
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala2
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala4
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala6
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala5
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala8
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala7
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala2
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala6
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala42
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala2
67 files changed, 159 insertions, 140 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala
index 7e608a2812..0a477abae5 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala
@@ -79,7 +79,7 @@ object DataFrameExample {
labelSummary.show()
// Convert features column to an RDD of vectors.
- val features = df.select("features").rdd.map { case Row(v: Vector) => v }
+ val features = df.select("features").map { case Row(v: Vector) => v }
val featureSummary = features.aggregate(new MultivariateOnlineSummarizer())(
(summary, feat) => summary.add(feat),
(sum1, sum2) => sum1.merge(sum2))
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala
index d2560cc00b..a37d12aa63 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala
@@ -310,8 +310,8 @@ object DecisionTreeExample {
data: DataFrame,
labelColName: String): Unit = {
val fullPredictions = model.transform(data).cache()
- val predictions = fullPredictions.select("prediction").rdd.map(_.getDouble(0))
- val labels = fullPredictions.select(labelColName).rdd.map(_.getDouble(0))
+ val predictions = fullPredictions.select("prediction").map(_.getDouble(0))
+ val labels = fullPredictions.select(labelColName).map(_.getDouble(0))
// Print number of classes for reference
val numClasses = MetadataUtils.getNumClasses(fullPredictions.schema(labelColName)) match {
case Some(n) => n
@@ -335,8 +335,8 @@ object DecisionTreeExample {
data: DataFrame,
labelColName: String): Unit = {
val fullPredictions = model.transform(data).cache()
- val predictions = fullPredictions.select("prediction").rdd.map(_.getDouble(0))
- val labels = fullPredictions.select(labelColName).rdd.map(_.getDouble(0))
+ val predictions = fullPredictions.select("prediction").map(_.getDouble(0))
+ val labels = fullPredictions.select(labelColName).map(_.getDouble(0))
val RMSE = new RegressionMetrics(predictions.zip(labels)).rootMeanSquaredError
println(s" Root mean squared error (RMSE): $RMSE")
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala
index a0bb5dabf4..ccee3b2aef 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala
@@ -155,7 +155,7 @@ object OneVsRestExample {
// evaluate the model
val predictionsAndLabels = predictions.select("prediction", "label")
- .rdd.map(row => (row.getDouble(0), row.getDouble(1)))
+ .map(row => (row.getDouble(0), row.getDouble(1)))
val metrics = new MulticlassMetrics(predictionsAndLabels)
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala
index 038b2fe611..d28323555b 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala
@@ -221,7 +221,6 @@ object LDAExample {
val model = pipeline.fit(df)
val documents = model.transform(df)
.select("features")
- .rdd
.map { case Row(features: Vector) => features }
.zipWithIndex()
.map(_.swap)
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
index 620ff07631..a2f0fcd0e4 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
@@ -52,7 +52,7 @@ object RDDRelation {
val rddFromSql = sqlContext.sql("SELECT key, value FROM records WHERE key < 10")
println("Result of RDD.map:")
- rddFromSql.rdd.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println)
+ rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println)
// Queries can also be written using a LINQ-like Scala DSL.
df.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println)
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
index b654a2c8d4..4e427f54da 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
@@ -63,7 +63,7 @@ object HiveFromSpark {
val rddFromSql = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
println("Result of RDD.map:")
- val rddAsStrings = rddFromSql.rdd.map {
+ val rddAsStrings = rddFromSql.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
index 4b27ee6c5a..d1388b5e2e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
@@ -122,10 +122,8 @@ abstract class Predictor[
* and put it in an RDD with strong types.
*/
protected def extractLabeledPoints(dataset: DataFrame): RDD[LabeledPoint] = {
- dataset.select($(labelCol), $(featuresCol)).rdd.map {
- case Row(label: Double, features: Vector) =>
- LabeledPoint(label, features)
- }
+ dataset.select($(labelCol), $(featuresCol))
+ .map { case Row(label: Double, features: Vector) => LabeledPoint(label, features) }
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 0d329d2c08..ac0124513f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -263,11 +263,10 @@ class LogisticRegression @Since("1.2.0") (
protected[spark] def train(dataset: DataFrame, handlePersistence: Boolean):
LogisticRegressionModel = {
val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol))
- val instances: RDD[Instance] =
- dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map {
- case Row(label: Double, weight: Double, features: Vector) =>
- Instance(label, weight, features)
- }
+ val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).map {
+ case Row(label: Double, weight: Double, features: Vector) =>
+ Instance(label, weight, features)
+ }
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
@@ -791,7 +790,6 @@ sealed trait LogisticRegressionSummary extends Serializable {
/**
* :: Experimental ::
* Logistic regression training results.
- *
* @param predictions dataframe outputted by the model's `transform` method.
* @param probabilityCol field in "predictions" which gives the calibrated probability of
* each instance as a vector.
@@ -815,7 +813,6 @@ class BinaryLogisticRegressionTrainingSummary private[classification] (
/**
* :: Experimental ::
* Binary Logistic regression results for a given model.
- *
* @param predictions dataframe outputted by the model's `transform` method.
* @param probabilityCol field in "predictions" which gives the calibrated probability of
* each instance.
@@ -840,7 +837,7 @@ class BinaryLogisticRegressionSummary private[classification] (
// TODO: Allow the user to vary the number of bins using a setBins method in
// BinaryClassificationMetrics. For now the default is set to 100.
@transient private val binaryMetrics = new BinaryClassificationMetrics(
- predictions.select(probabilityCol, labelCol).rdd.map {
+ predictions.select(probabilityCol, labelCol).map {
case Row(score: Vector, label: Double) => (score(1), label)
}, 100
)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
index f014a1d572..45d293bc69 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
@@ -112,7 +112,7 @@ class BisectingKMeansModel private[ml] (
@Since("2.0.0")
def computeCost(dataset: DataFrame): Double = {
SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT)
- val data = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point }
+ val data = dataset.select(col($(featuresCol))).map { case Row(point: Vector) => point }
parentModel.computeCost(data)
}
}
@@ -176,7 +176,7 @@ class BisectingKMeans @Since("2.0.0") (
@Since("2.0.0")
override def fit(dataset: DataFrame): BisectingKMeansModel = {
- val rdd = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point }
+ val rdd = dataset.select(col($(featuresCol))).map { case Row(point: Vector) => point }
val bkm = new MLlibBisectingKMeans()
.setK($(k))
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
index 79332b0d02..c6a3eac587 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
@@ -130,7 +130,7 @@ class KMeansModel private[ml] (
@Since("1.6.0")
def computeCost(dataset: DataFrame): Double = {
SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT)
- val data = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point }
+ val data = dataset.select(col($(featuresCol))).map { case Row(point: Vector) => point }
parentModel.computeCost(data)
}
@@ -260,7 +260,7 @@ class KMeans @Since("1.5.0") (
@Since("1.5.0")
override def fit(dataset: DataFrame): KMeansModel = {
- val rdd = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point }
+ val rdd = dataset.select(col($(featuresCol))).map { case Row(point: Vector) => point }
val algo = new MLlibKMeans()
.setK($(k))
@@ -303,7 +303,7 @@ class KMeansSummary private[clustering] (
* Size of each cluster.
*/
@Since("2.0.0")
- lazy val size: Array[Int] = cluster.rdd.map {
+ lazy val size: Array[Int] = cluster.map {
case Row(clusterIdx: Int) => (clusterIdx, 1)
}.reduceByKey(_ + _).collect().sortBy(_._1).map(_._2)
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
index 6304b20d54..99383e77f7 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
@@ -803,7 +803,6 @@ private[clustering] object LDA extends DefaultParamsReadable[LDA] {
dataset
.withColumn("docId", monotonicallyIncreasingId())
.select("docId", featuresCol)
- .rdd
.map { case Row(docId: Long, features: Vector) =>
(docId, features)
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala
index 00f3125584..a1d36c4bec 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala
@@ -84,10 +84,11 @@ class BinaryClassificationEvaluator @Since("1.4.0") (@Since("1.4.0") override va
SchemaUtils.checkColumnType(schema, $(labelCol), DoubleType)
// TODO: When dataset metadata has been implemented, check rawPredictionCol vector length = 2.
- val scoreAndLabels = dataset.select($(rawPredictionCol), $(labelCol)).rdd.map {
- case Row(rawPrediction: Vector, label: Double) => (rawPrediction(1), label)
- case Row(rawPrediction: Double, label: Double) => (rawPrediction, label)
- }
+ val scoreAndLabels = dataset.select($(rawPredictionCol), $(labelCol))
+ .map {
+ case Row(rawPrediction: Vector, label: Double) => (rawPrediction(1), label)
+ case Row(rawPrediction: Double, label: Double) => (rawPrediction, label)
+ }
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val metric = $(metricName) match {
case "areaUnderROC" => metrics.areaUnderROC()
diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala
index 55ff44323a..a921153b94 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala
@@ -74,9 +74,9 @@ class MulticlassClassificationEvaluator @Since("1.5.0") (@Since("1.5.0") overrid
SchemaUtils.checkColumnType(schema, $(predictionCol), DoubleType)
SchemaUtils.checkColumnType(schema, $(labelCol), DoubleType)
- val predictionAndLabels = dataset.select($(predictionCol), $(labelCol)).rdd.map {
- case Row(prediction: Double, label: Double) =>
- (prediction, label)
+ val predictionAndLabels = dataset.select($(predictionCol), $(labelCol))
+ .map { case Row(prediction: Double, label: Double) =>
+ (prediction, label)
}
val metrics = new MulticlassMetrics(predictionAndLabels)
val metric = $(metricName) match {
diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala
index adee61e297..b6b25ecd01 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala
@@ -85,8 +85,7 @@ final class RegressionEvaluator @Since("1.4.0") (@Since("1.4.0") override val ui
val predictionAndLabels = dataset
.select(col($(predictionCol)).cast(DoubleType), col($(labelCol)).cast(DoubleType))
- .rdd.
- map { case Row(prediction: Double, label: Double) =>
+ .map { case Row(prediction: Double, label: Double) =>
(prediction, label)
}
val metrics = new RegressionMetrics(predictionAndLabels)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala
index 4abc459f53..7b565ef3ed 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala
@@ -79,7 +79,7 @@ final class ChiSqSelector(override val uid: String)
override def fit(dataset: DataFrame): ChiSqSelectorModel = {
transformSchema(dataset.schema, logging = true)
- val input = dataset.select($(labelCol), $(featuresCol)).rdd.map {
+ val input = dataset.select($(labelCol), $(featuresCol)).map {
case Row(label: Double, features: Vector) =>
LabeledPoint(label, features)
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala
index cf151458f0..a6dfe58e56 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala
@@ -133,7 +133,7 @@ class CountVectorizer(override val uid: String)
override def fit(dataset: DataFrame): CountVectorizerModel = {
transformSchema(dataset.schema, logging = true)
val vocSize = $(vocabSize)
- val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
+ val input = dataset.select($(inputCol)).map(_.getAs[Seq[String]](0))
val minDf = if ($(minDF) >= 1.0) {
$(minDF)
} else {
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
index cebbe5c162..9e7eee4f29 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
@@ -79,7 +79,7 @@ final class IDF(override val uid: String) extends Estimator[IDFModel] with IDFBa
override def fit(dataset: DataFrame): IDFModel = {
transformSchema(dataset.schema, logging = true)
- val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v }
+ val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v }
val idf = new feature.IDF($(minDocFreq)).fit(input)
copyValues(new IDFModel(uid, idf).setParent(this))
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
index 18be5c0701..ad0458d0d0 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
@@ -108,7 +108,7 @@ class MinMaxScaler(override val uid: String)
override def fit(dataset: DataFrame): MinMaxScalerModel = {
transformSchema(dataset.schema, logging = true)
- val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v }
+ val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v }
val summary = Statistics.colStats(input)
copyValues(new MinMaxScalerModel(uid, summary.min, summary.max).setParent(this))
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
index e9df161c00..342540418f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
@@ -130,7 +130,7 @@ class OneHotEncoder(override val uid: String) extends Transformer
transformSchema(dataset.schema)(outputColName))
if (outputAttrGroup.size < 0) {
// If the number of attributes is unknown, we check the values from the input column.
- val numAttrs = dataset.select(col(inputColName).cast(DoubleType)).rdd.map(_.getDouble(0))
+ val numAttrs = dataset.select(col(inputColName).cast(DoubleType)).map(_.getDouble(0))
.aggregate(0.0)(
(m, x) => {
assert(x >=0.0 && x == x.toInt,
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
index 80b124f747..0e07dfabfe 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
@@ -70,7 +70,7 @@ class PCA (override val uid: String) extends Estimator[PCAModel] with PCAParams
*/
override def fit(dataset: DataFrame): PCAModel = {
transformSchema(dataset.schema, logging = true)
- val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v}
+ val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v}
val pca = new feature.PCA(k = $(k))
val pcaModel = pca.fit(input)
copyValues(new PCAModel(uid, pcaModel.pc, pcaModel.explainedVariance).setParent(this))
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
index 9952d3bc9f..6a0b6c240e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
@@ -87,7 +87,7 @@ class StandardScaler(override val uid: String) extends Estimator[StandardScalerM
override def fit(dataset: DataFrame): StandardScalerModel = {
transformSchema(dataset.schema, logging = true)
- val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v }
+ val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v }
val scaler = new feature.StandardScaler(withMean = $(withMean), withStd = $(withStd))
val scalerModel = scaler.fit(input)
copyValues(new StandardScalerModel(uid, scalerModel.std, scalerModel.mean).setParent(this))
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
index e8b617d9c8..912bd95a2e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
@@ -83,7 +83,6 @@ class StringIndexer(override val uid: String) extends Estimator[StringIndexerMod
override def fit(dataset: DataFrame): StringIndexerModel = {
val counts = dataset.select(col($(inputCol)).cast(StringType))
- .rdd
.map(_.getString(0))
.countByValue()
val labels = counts.toSeq.sortBy(-_._2).map(_._1).toArray
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
index 5c11760fab..2a5268406d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
@@ -113,7 +113,7 @@ class VectorIndexer(override val uid: String) extends Estimator[VectorIndexerMod
val firstRow = dataset.select($(inputCol)).take(1)
require(firstRow.length == 1, s"VectorIndexer cannot be fit on an empty dataset.")
val numFeatures = firstRow(0).getAs[Vector](0).size
- val vectorDataset = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v }
+ val vectorDataset = dataset.select($(inputCol)).map { case Row(v: Vector) => v }
val maxCats = $(maxCategories)
val categoryStats: VectorIndexer.CategoryStats = vectorDataset.mapPartitions { iter =>
val localCatStats = new VectorIndexer.CategoryStats(numFeatures, maxCats)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
index a4c3d2751f..2b6b3c3a0f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
@@ -138,7 +138,7 @@ final class Word2Vec(override val uid: String) extends Estimator[Word2VecModel]
override def fit(dataset: DataFrame): Word2VecModel = {
transformSchema(dataset.schema, logging = true)
- val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
+ val input = dataset.select($(inputCol)).map(_.getAs[Seq[String]](0))
val wordVectors = new feature.Word2Vec()
.setLearningRate($(stepSize))
.setMinCount($(minCount))
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index dacdac9a1d..4be4d6abed 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -392,7 +392,6 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
val r = if ($(ratingCol) != "") col($(ratingCol)).cast(FloatType) else lit(1.0f)
val ratings = dataset
.select(col($(userCol)).cast(IntegerType), col($(itemCol)).cast(IntegerType), r)
- .rdd
.map { row =>
Rating(row.getInt(0), row.getInt(1), row.getFloat(2))
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
index e4339d67b9..1e5b4cb83c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
@@ -184,7 +184,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
* and put it in an RDD with strong types.
*/
protected[ml] def extractAFTPoints(dataset: DataFrame): RDD[AFTPoint] = {
- dataset.select($(featuresCol), $(labelCol), $(censorCol)).rdd.map {
+ dataset.select($(featuresCol), $(labelCol), $(censorCol)).map {
case Row(features: Vector, label: Double, censor: Double) =>
AFTPoint(features, label, censor)
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
index 36b006c10e..1573bb4c1b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
@@ -90,9 +90,9 @@ private[regression] trait IsotonicRegressionBase extends Params with HasFeatures
} else {
lit(1.0)
}
- dataset.select(col($(labelCol)), f, w).rdd.map {
- case Row(label: Double, feature: Double, weight: Double) =>
- (label, feature, weight)
+ dataset.select(col($(labelCol)), f, w)
+ .map { case Row(label: Double, feature: Double, weight: Double) =>
+ (label, feature, weight)
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index 8f78fd122f..ccfb5c4b9d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -158,7 +158,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
override protected def train(dataset: DataFrame): LinearRegressionModel = {
// Extract the number of features before deciding optimization solver.
- val numFeatures = dataset.select(col($(featuresCol))).limit(1).rdd.map {
+ val numFeatures = dataset.select(col($(featuresCol))).limit(1).map {
case Row(features: Vector) => features.size
}.first()
val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol))
@@ -170,7 +170,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
// For low dimensional data, WeightedLeastSquares is more efficiently since the
// training algorithm only requires one pass through the data. (SPARK-10668)
val instances: RDD[Instance] = dataset.select(
- col($(labelCol)), w, col($(featuresCol))).rdd.map {
+ col($(labelCol)), w, col($(featuresCol))).map {
case Row(label: Double, weight: Double, features: Vector) =>
Instance(label, weight, features)
}
@@ -196,11 +196,10 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
return lrModel.setSummary(trainingSummary)
}
- val instances: RDD[Instance] =
- dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map {
- case Row(label: Double, weight: Double, features: Vector) =>
- Instance(label, weight, features)
- }
+ val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).map {
+ case Row(label: Double, weight: Double, features: Vector) =>
+ Instance(label, weight, features)
+ }
val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
@@ -514,7 +513,6 @@ object LinearRegressionModel extends MLReadable[LinearRegressionModel] {
* :: Experimental ::
* Linear regression training results. Currently, the training summary ignores the
* training coefficients except for the objective trace.
- *
* @param predictions predictions outputted by the model's `transform` method.
* @param objectiveHistory objective function (scaled loss + regularization) at each iteration.
*/
@@ -539,7 +537,6 @@ class LinearRegressionTrainingSummary private[regression] (
/**
* :: Experimental ::
* Linear regression results evaluated on a dataset.
- *
* @param predictions predictions outputted by the model's `transform` method.
*/
@Since("1.5.0")
@@ -554,7 +551,6 @@ class LinearRegressionSummary private[regression] (
@transient private val metrics = new RegressionMetrics(
predictions
.select(predictionCol, labelCol)
- .rdd
.map { case Row(pred: Double, label: Double) => (pred, label) },
!model.getFitIntercept)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index ca0ed95a48..93cf16e6f0 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -1052,7 +1052,7 @@ private[python] class PythonMLLibAPI extends Serializable {
* Java stub for the constructor of Python mllib RankingMetrics
*/
def newRankingMetrics(predictionAndLabels: DataFrame): RankingMetrics[Any] = {
- new RankingMetrics(predictionAndLabels.rdd.map(
+ new RankingMetrics(predictionAndLabels.map(
r => (r.getSeq(0).toArray[Any], r.getSeq(1).toArray[Any])))
}
@@ -1135,7 +1135,7 @@ private[python] class PythonMLLibAPI extends Serializable {
def createIndexedRowMatrix(rows: DataFrame, numRows: Long, numCols: Int): IndexedRowMatrix = {
// We use DataFrames for serialization of IndexedRows from Python,
// so map each Row in the DataFrame back to an IndexedRow.
- val indexedRows = rows.rdd.map {
+ val indexedRows = rows.map {
case Row(index: Long, vector: Vector) => IndexedRow(index, vector)
}
new IndexedRowMatrix(indexedRows, numRows, numCols)
@@ -1147,7 +1147,7 @@ private[python] class PythonMLLibAPI extends Serializable {
def createCoordinateMatrix(rows: DataFrame, numRows: Long, numCols: Long): CoordinateMatrix = {
// We use DataFrames for serialization of MatrixEntry entries from
// Python, so map each Row in the DataFrame back to a MatrixEntry.
- val entries = rows.rdd.map {
+ val entries = rows.map {
case Row(i: Long, j: Long, value: Double) => MatrixEntry(i, j, value)
}
new CoordinateMatrix(entries, numRows, numCols)
@@ -1161,7 +1161,7 @@ private[python] class PythonMLLibAPI extends Serializable {
// We use DataFrames for serialization of sub-matrix blocks from
// Python, so map each Row in the DataFrame back to a
// ((blockRowIndex, blockColIndex), sub-matrix) tuple.
- val blockTuples = blocks.rdd.map {
+ val blockTuples = blocks.map {
case Row(Row(blockRowIndex: Long, blockColIndex: Long), subMatrix: Matrix) =>
((blockRowIndex.toInt, blockColIndex.toInt), subMatrix)
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
index 3b91fe8643..26c6235fe5 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
@@ -143,7 +143,7 @@ object KMeansModel extends Loader[KMeansModel] {
val k = (metadata \ "k").extract[Int]
val centroids = sqlContext.read.parquet(Loader.dataPath(path))
Loader.checkSchema[Cluster](centroids.schema)
- val localCentroids = centroids.rdd.map(Cluster.apply).collect()
+ val localCentroids = centroids.map(Cluster.apply).collect()
assert(k == localCentroids.size)
new KMeansModel(localCentroids.sortBy(_.id).map(_.point))
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index 25d67a3756..b30ecb8020 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -896,11 +896,11 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] {
Loader.checkSchema[EdgeData](edgeDataFrame.schema)
val globalTopicTotals: LDA.TopicCounts =
dataFrame.first().getAs[Vector](0).toBreeze.toDenseVector
- val vertices: RDD[(VertexId, LDA.TopicCounts)] = vertexDataFrame.rdd.map {
+ val vertices: RDD[(VertexId, LDA.TopicCounts)] = vertexDataFrame.map {
case Row(ind: Long, vec: Vector) => (ind, vec.toBreeze.toDenseVector)
}
- val edges: RDD[Edge[LDA.TokenCount]] = edgeDataFrame.rdd.map {
+ val edges: RDD[Edge[LDA.TokenCount]] = edgeDataFrame.map {
case Row(srcId: Long, dstId: Long, prop: Double) => Edge(srcId, dstId, prop)
}
val graph: Graph[LDA.TopicCounts, LDA.TokenCount] = Graph(vertices, edges)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
index 9732dfa174..feacafec79 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
@@ -93,7 +93,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode
val assignments = sqlContext.read.parquet(Loader.dataPath(path))
Loader.checkSchema[PowerIterationClustering.Assignment](assignments.schema)
- val assignmentsRDD = assignments.rdd.map {
+ val assignmentsRDD = assignments.map {
case Row(id: Long, cluster: Int) => PowerIterationClustering.Assignment(id, cluster)
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
index 319c54724d..12cf220957 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
@@ -58,7 +58,7 @@ class BinaryClassificationMetrics @Since("1.3.0") (
* @param scoreAndLabels a DataFrame with two double columns: score and label
*/
private[mllib] def this(scoreAndLabels: DataFrame) =
- this(scoreAndLabels.rdd.map(r => (r.getDouble(0), r.getDouble(1))))
+ this(scoreAndLabels.map(r => (r.getDouble(0), r.getDouble(1))))
/**
* Unpersist intermediate RDDs used in the computation.
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala
index 3029b15f58..c5104960cf 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala
@@ -38,7 +38,7 @@ class MulticlassMetrics @Since("1.1.0") (predictionAndLabels: RDD[(Double, Doubl
* @param predictionAndLabels a DataFrame with two double columns: prediction and label
*/
private[mllib] def this(predictionAndLabels: DataFrame) =
- this(predictionAndLabels.rdd.map(r => (r.getDouble(0), r.getDouble(1))))
+ this(predictionAndLabels.map(r => (r.getDouble(0), r.getDouble(1))))
private lazy val labelCountByClass: Map[Double, Long] = predictionAndLabels.values.countByValue()
private lazy val labelCount: Long = labelCountByClass.values.sum
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala
index daf6ff4db4..c100b3c9ec 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala
@@ -35,9 +35,7 @@ class MultilabelMetrics @Since("1.2.0") (predictionAndLabels: RDD[(Array[Double]
* @param predictionAndLabels a DataFrame with two double array columns: prediction and label
*/
private[mllib] def this(predictionAndLabels: DataFrame) =
- this(predictionAndLabels.rdd.map { r =>
- (r.getSeq[Double](0).toArray, r.getSeq[Double](1).toArray)
- })
+ this(predictionAndLabels.map(r => (r.getSeq[Double](0).toArray, r.getSeq[Double](1).toArray)))
private lazy val numDocs: Long = predictionAndLabels.count()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala
index 0f4c97ec20..18c90b204a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala
@@ -46,7 +46,7 @@ class RegressionMetrics @Since("2.0.0") (
* prediction and observation
*/
private[mllib] def this(predictionAndObservations: DataFrame) =
- this(predictionAndObservations.rdd.map(r => (r.getDouble(0), r.getDouble(1))))
+ this(predictionAndObservations.map(r => (r.getDouble(0), r.getDouble(1))))
/**
* Use MultivariateOnlineSummarizer to calculate summary statistics of observations and errors.
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala
index 4f0e13feae..33728bf5d7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala
@@ -161,7 +161,7 @@ object ChiSqSelectorModel extends Loader[ChiSqSelectorModel] {
// Check schema explicitly since erasure makes it hard to use match-case for checking.
Loader.checkSchema[Data](dataFrame.schema)
- val features = dataArray.rdd.map {
+ val features = dataArray.map {
case Row(feature: Int) => (feature)
}.collect()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
index b35d7217d6..85d609386f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
@@ -134,7 +134,7 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] {
}
def loadImpl[Item: ClassTag](freqItemsets: DataFrame, sample: Item): FPGrowthModel[Item] = {
- val freqItemsetsRDD = freqItemsets.select("items", "freq").rdd.map { x =>
+ val freqItemsetsRDD = freqItemsets.select("items", "freq").map { x =>
val items = x.getAs[Seq[Item]](0).toArray
val freq = x.getLong(1)
new FreqItemset(items, freq)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index c91729a9fd..628cf1dd57 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -369,13 +369,13 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
assert(className == thisClassName)
assert(formatVersion == thisFormatVersion)
val rank = (metadata \ "rank").extract[Int]
- val userFeatures = sqlContext.read.parquet(userPath(path)).rdd.map {
- case Row(id: Int, features: Seq[_]) =>
- (id, features.asInstanceOf[Seq[Double]].toArray)
- }
- val productFeatures = sqlContext.read.parquet(productPath(path)).rdd.map {
- case Row(id: Int, features: Seq[_]) =>
+ val userFeatures = sqlContext.read.parquet(userPath(path))
+ .map { case Row(id: Int, features: Seq[_]) =>
(id, features.asInstanceOf[Seq[Double]].toArray)
+ }
+ val productFeatures = sqlContext.read.parquet(productPath(path))
+ .map { case Row(id: Int, features: Seq[_]) =>
+ (id, features.asInstanceOf[Seq[Double]].toArray)
}
new MatrixFactorizationModel(rank, userFeatures, productFeatures)
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
index ec5d7b9189..89c470d573 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
@@ -247,7 +247,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging {
val dataRDD = sqlContext.read.parquet(datapath)
// Check schema explicitly since erasure makes it hard to use match-case for checking.
Loader.checkSchema[NodeData](dataRDD.schema)
- val nodes = dataRDD.rdd.map(NodeData.apply)
+ val nodes = dataRDD.map(NodeData.apply)
// Build node data into a tree.
val trees = constructTrees(nodes)
assert(trees.size == 1,
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
index 59713c382e..feabcee24f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
@@ -473,7 +473,7 @@ private[tree] object TreeEnsembleModel extends Logging {
treeAlgo: String): Array[DecisionTreeModel] = {
val datapath = Loader.dataPath(path)
val sqlContext = SQLContext.getOrCreate(sc)
- val nodes = sqlContext.read.parquet(datapath).rdd.map(NodeData.apply)
+ val nodes = sqlContext.read.parquet(datapath).map(NodeData.apply)
val trees = constructTrees(nodes)
trees.map(new DecisionTreeModel(_, Algo.fromString(treeAlgo)))
}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
index cfb9bbfd41..972c0868a4 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
@@ -735,7 +735,7 @@ class LogisticRegressionSuite
val model1 = trainer1.fit(binaryDataset)
val model2 = trainer2.fit(binaryDataset)
- val histogram = binaryDataset.rdd.map { case Row(label: Double, features: Vector) => label }
+ val histogram = binaryDataset.map { case Row(label: Double, features: Vector) => label }
.treeAggregate(new MultiClassSummarizer)(
seqOp = (c, v) => (c, v) match {
case (classSummarizer: MultiClassSummarizer, label: Double) => classSummarizer.add(label)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala
index 602b5a8116..a326432d01 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala
@@ -76,9 +76,8 @@ class MultilayerPerceptronClassifierSuite extends SparkFunSuite with MLlibTestSp
val model = trainer.fit(dataFrame)
val numFeatures = dataFrame.select("features").first().getAs[Vector](0).size
assert(model.numFeatures === numFeatures)
- val mlpPredictionAndLabels = model.transform(dataFrame).select("prediction", "label").rdd.map {
- case Row(p: Double, l: Double) => (p, l)
- }
+ val mlpPredictionAndLabels = model.transform(dataFrame).select("prediction", "label")
+ .map { case Row(p: Double, l: Double) => (p, l) }
// train multinomial logistic regression
val lr = new LogisticRegressionWithLBFGS()
.setIntercept(true)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
index 2ae74a2090..445e50d867 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
@@ -81,9 +81,9 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext {
val predictionColSchema = transformedDataset.schema(ovaModel.getPredictionCol)
assert(MetadataUtils.getNumClasses(predictionColSchema) === Some(3))
- val ovaResults = transformedDataset.select("prediction", "label").rdd.map {
- row => (row.getDouble(0), row.getDouble(1))
- }
+ val ovaResults = transformedDataset
+ .select("prediction", "label")
+ .map(row => (row.getDouble(0), row.getDouble(1)))
val lr = new LogisticRegressionWithLBFGS().setIntercept(true).setNumClasses(numClasses)
lr.optimizer.setRegParam(0.1).setNumIterations(100)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
index b719a8c7e7..fc4a4add5d 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
@@ -77,8 +77,7 @@ class BisectingKMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
expectedColumns.foreach { column =>
assert(transformed.columns.contains(column))
}
- val clusters =
- transformed.select(predictionColName).rdd.map(_.getInt(0)).distinct().collect().toSet
+ val clusters = transformed.select(predictionColName).map(_.getInt(0)).distinct().collect().toSet
assert(clusters.size === k)
assert(clusters === Set(0, 1, 2, 3, 4))
assert(model.computeCost(dataset) < 0.1)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
index c684bc11cc..e5357ba8e2 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
@@ -93,8 +93,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultR
expectedColumns.foreach { column =>
assert(transformed.columns.contains(column))
}
- val clusters =
- transformed.select(predictionColName).rdd.map(_.getInt(0)).distinct().collect().toSet
+ val clusters = transformed.select(predictionColName).map(_.getInt(0)).distinct().collect().toSet
assert(clusters.size === k)
assert(clusters === Set(0, 1, 2, 3, 4))
assert(model.computeCost(dataset) < 0.1)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
index 03270401ad..97dbfd9a43 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
@@ -199,7 +199,7 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
// describeTopics
val topics = model.describeTopics(3)
assert(topics.count() === k)
- assert(topics.select("topic").rdd.map(_.getInt(0)).collect().toSet === Range(0, k).toSet)
+ assert(topics.select("topic").map(_.getInt(0)).collect().toSet === Range(0, k).toSet)
topics.select("termIndices").collect().foreach { case r: Row =>
val termIndices = r.getAs[Seq[Int]](0)
assert(termIndices.length === 3 && termIndices.toSet.size === 3)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala
index e238b33ed8..76d12050f9 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala
@@ -51,7 +51,7 @@ class OneHotEncoderSuite
.setDropLast(false)
val encoded = encoder.transform(transformed)
- val output = encoded.select("id", "labelVec").rdd.map { r =>
+ val output = encoded.select("id", "labelVec").map { r =>
val vec = r.getAs[Vector](1)
(r.getInt(0), vec(0), vec(1), vec(2))
}.collect().toSet
@@ -68,7 +68,7 @@ class OneHotEncoderSuite
.setOutputCol("labelVec")
val encoded = encoder.transform(transformed)
- val output = encoded.select("id", "labelVec").rdd.map { r =>
+ val output = encoded.select("id", "labelVec").map { r =>
val vec = r.getAs[Vector](1)
(r.getInt(0), vec(0), vec(1))
}.collect().toSet
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
index dea61648f2..5d199ca9b5 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
@@ -52,7 +52,7 @@ class StringIndexerSuite
val attr = Attribute.fromStructField(transformed.schema("labelIndex"))
.asInstanceOf[NominalAttribute]
assert(attr.values.get === Array("a", "c", "b"))
- val output = transformed.select("id", "labelIndex").rdd.map { r =>
+ val output = transformed.select("id", "labelIndex").map { r =>
(r.getInt(0), r.getDouble(1))
}.collect().toSet
// a -> 0, b -> 2, c -> 1
@@ -83,7 +83,7 @@ class StringIndexerSuite
val attr = Attribute.fromStructField(transformed.schema("labelIndex"))
.asInstanceOf[NominalAttribute]
assert(attr.values.get === Array("b", "a"))
- val output = transformed.select("id", "labelIndex").rdd.map { r =>
+ val output = transformed.select("id", "labelIndex").map { r =>
(r.getInt(0), r.getDouble(1))
}.collect().toSet
// a -> 1, b -> 0
@@ -102,7 +102,7 @@ class StringIndexerSuite
val attr = Attribute.fromStructField(transformed.schema("labelIndex"))
.asInstanceOf[NominalAttribute]
assert(attr.values.get === Array("100", "300", "200"))
- val output = transformed.select("id", "labelIndex").rdd.map { r =>
+ val output = transformed.select("id", "labelIndex").map { r =>
(r.getInt(0), r.getDouble(1))
}.collect().toSet
// 100 -> 0, 200 -> 2, 300 -> 1
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
index d4f836ef33..67817fa4ba 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
@@ -159,7 +159,7 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext
// Chose correct categorical features
assert(categoryMaps.keys.toSet === categoricalFeatures)
val transformed = model.transform(data).select("indexed")
- val indexedRDD: RDD[Vector] = transformed.rdd.map(_.getAs[Vector](0))
+ val indexedRDD: RDD[Vector] = transformed.map(_.getAs[Vector](0))
val featureAttrs = AttributeGroup.fromStructField(transformed.schema("indexed"))
assert(featureAttrs.name === "indexed")
assert(featureAttrs.attributes.get.length === model.numFeatures)
@@ -216,8 +216,7 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext
val points = data.collect().map(_.getAs[Vector](0))
val vectorIndexer = getIndexer.setMaxCategories(maxCategories)
val model = vectorIndexer.fit(data)
- val indexedPoints =
- model.transform(data).select("indexed").rdd.map(_.getAs[Vector](0)).collect()
+ val indexedPoints = model.transform(data).select("indexed").map(_.getAs[Vector](0)).collect()
points.zip(indexedPoints).foreach {
case (orig: SparseVector, indexed: SparseVector) =>
assert(orig.indices.length == indexed.indices.length)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala
index 1671fb6f3a..f094c550e5 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala
@@ -100,7 +100,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
.setSeed(42L)
.fit(docDF)
- val realVectors = model.getVectors.sort("word").select("vector").rdd.map {
+ val realVectors = model.getVectors.sort("word").select("vector").map {
case Row(v: Vector) => v
}.collect()
// These expectations are just magic values, characterizing the current
@@ -134,7 +134,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
.fit(docDF)
val expectedSimilarity = Array(0.2608488929093532, -0.8271274846926078)
- val (synonyms, similarity) = model.findSynonyms("a", 2).rdd.map {
+ val (synonyms, similarity) = model.findSynonyms("a", 2).map {
case Row(w: String, sim: Double) => (w, sim)
}.collect().unzip
@@ -161,7 +161,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
.setSeed(42L)
.fit(docDF)
- val (synonyms, similarity) = model.findSynonyms("a", 6).rdd.map {
+ val (synonyms, similarity) = model.findSynonyms("a", 6).map {
case Row(w: String, sim: Double) => (w, sim)
}.collect().unzip
@@ -174,7 +174,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
.setWindowSize(10)
.fit(docDF)
- val (synonymsLarger, similarityLarger) = model.findSynonyms("a", 6).rdd.map {
+ val (synonymsLarger, similarityLarger) = model.findSynonyms("a", 6).map {
case Row(w: String, sim: Double) => (w, sim)
}.collect().unzip
// The similarity score should be very different with the larger window
diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index 2bedd70ce9..ff0d8f5568 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -342,10 +342,11 @@ class ALSSuite
.setSeed(0)
val alpha = als.getAlpha
val model = als.fit(training.toDF())
- val predictions = model.transform(test.toDF()).select("rating", "prediction").rdd.map {
- case Row(rating: Float, prediction: Float) =>
+ val predictions = model.transform(test.toDF())
+ .select("rating", "prediction")
+ .map { case Row(rating: Float, prediction: Float) =>
(rating.toDouble, prediction.toDouble)
- }
+ }
val rmse =
if (implicitPrefs) {
// TODO: Use a better (rank-based?) evaluation metric for implicit feedback.
diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala
index 244db8637b..09326600e6 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala
@@ -87,7 +87,7 @@ class GBTRegressorSuite extends SparkFunSuite with MLlibTestSparkContext {
// copied model must have the same parent.
MLTestingUtils.checkCopy(model)
val preds = model.transform(df)
- val predictions = preds.select("prediction").rdd.map(_.getDouble(0))
+ val predictions = preds.select("prediction").map(_.getDouble(0))
// Checks based on SPARK-8736 (to ensure it is not doing classification)
assert(predictions.max() > 2)
assert(predictions.min() < -1)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
index b8874b4cd3..f067c29d27 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
@@ -46,7 +46,7 @@ class IsotonicRegressionSuite
val predictions = model
.transform(dataset)
- .select("prediction").rdd.map { case Row(pred) =>
+ .select("prediction").map { case Row(pred) =>
pred
}.collect()
@@ -66,7 +66,7 @@ class IsotonicRegressionSuite
val predictions = model
.transform(features)
- .select("prediction").rdd.map {
+ .select("prediction").map {
case Row(pred) => pred
}.collect()
@@ -160,7 +160,7 @@ class IsotonicRegressionSuite
val predictions = model
.transform(features)
- .select("prediction").rdd.map {
+ .select("prediction").map {
case Row(pred) => pred
}.collect()
diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
index 9dee04c877..3ae108d822 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
@@ -686,18 +686,17 @@ class LinearRegressionSuite
// Residuals in [[LinearRegressionResults]] should equal those manually computed
val expectedResiduals = datasetWithDenseFeature.select("features", "label")
- .rdd
.map { case Row(features: DenseVector, label: Double) =>
- val prediction =
- features(0) * model.coefficients(0) + features(1) * model.coefficients(1) +
- model.intercept
- label - prediction
- }
- .zip(model.summary.residuals.rdd.map(_.getDouble(0)))
+ val prediction =
+ features(0) * model.coefficients(0) + features(1) * model.coefficients(1) +
+ model.intercept
+ label - prediction
+ }
+ .zip(model.summary.residuals.map(_.getDouble(0)))
.collect()
.foreach { case (manualResidual: Double, resultResidual: Double) =>
- assert(manualResidual ~== resultResidual relTol 1E-5)
- }
+ assert(manualResidual ~== resultResidual relTol 1E-5)
+ }
/*
# Use the following R code to generate model training results.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index be902d688e..abb8fe552b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1427,6 +1427,48 @@ class DataFrame private[sql](
def transform[U](t: DataFrame => DataFrame): DataFrame = t(this)
/**
+ * Returns a new RDD by applying a function to all rows of this DataFrame.
+ * @group rdd
+ * @since 1.3.0
+ */
+ def map[R: ClassTag](f: Row => R): RDD[R] = rdd.map(f)
+
+ /**
+ * Returns a new RDD by first applying a function to all rows of this [[DataFrame]],
+ * and then flattening the results.
+ * @group rdd
+ * @since 1.3.0
+ */
+ def flatMap[R: ClassTag](f: Row => TraversableOnce[R]): RDD[R] = rdd.flatMap(f)
+
+ /**
+ * Returns a new RDD by applying a function to each partition of this DataFrame.
+ * @group rdd
+ * @since 1.3.0
+ */
+ def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = {
+ rdd.mapPartitions(f)
+ }
+
+ /**
+ * Applies a function `f` to all rows.
+ * @group rdd
+ * @since 1.3.0
+ */
+ def foreach(f: Row => Unit): Unit = withNewExecutionId {
+ rdd.foreach(f)
+ }
+
+ /**
+ * Applies a function f to each partition of this [[DataFrame]].
+ * @group rdd
+ * @since 1.3.0
+ */
+ def foreachPartition(f: Iterator[Row] => Unit): Unit = withNewExecutionId {
+ rdd.foreachPartition(f)
+ }
+
+ /**
* Returns the first `n` rows in the [[DataFrame]].
*
* Running take requires moving data into the application's driver process, and doing so with
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
index a7258d742a..f06d16116e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@@ -306,7 +306,6 @@ class GroupedData protected[sql](
val values = df.select(pivotColumn)
.distinct()
.sort(pivotColumn) // ensure that the output columns are in a consistent logical order
- .rdd
.map(_.get(0))
.take(maxValues + 1)
.toSeq
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index 68a251757c..d912aeb70d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -100,7 +100,7 @@ private[r] object SQLUtils {
}
def dfToRowRDD(df: DataFrame): JavaRDD[Array[Byte]] = {
- df.rdd.map(r => rowToRBytes(r))
+ df.map(r => rowToRBytes(r))
}
private[this] def doConversion(data: Object, dataType: DataType): Object = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index d7111a6a1c..e295722cac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -276,7 +276,7 @@ object JdbcUtils extends Logging {
val rddSchema = df.schema
val getConnection: () => Connection = createConnectionFactory(url, properties)
val batchSize = properties.getProperty("batchsize", "1000").toInt
- df.rdd.foreachPartition { iterator =>
+ df.foreachPartition { iterator =>
savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 7d96ef6fe0..f54bff9f18 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -257,7 +257,7 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
}
test("count") {
- assert(testData2.count() === testData2.rdd.map(_ => 1).count())
+ assert(testData2.count() === testData2.map(_ => 1).count())
checkAnswer(
testData2.agg(count('a), sumDistinct('a)), // non-partial
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index bd51154c58..fbffe867e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -101,7 +101,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
(implicit df: DataFrame): Unit = {
def checkBinaryAnswer(df: DataFrame, expected: Seq[Row]) = {
assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).sorted) {
- df.rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted
+ df.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index c85eeddc2c..3c74464d57 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -599,7 +599,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
test("null and non-null strings") {
// Create a dataset where the first values are NULL and then some non-null values. The
// number of non-nulls needs to be bigger than the ParquetReader batch size.
- val data = sqlContext.range(200).rdd.map { i =>
+ val data = sqlContext.range(200).map { i =>
if (i.getLong(0) < 150) Row(None)
else Row("a")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 39920d8cc6..085e4a49a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -330,7 +330,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
// listener should ignore the non SQL stage
assert(sqlContext.listener.stageIdToStageMetrics.size == previousStageNumber)
- sqlContext.sparkContext.parallelize(1 to 10).toDF().rdd.foreach(i => ())
+ sqlContext.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
sqlContext.sparkContext.listenerBus.waitUntilEmpty(10000)
// listener should save the SQL stage
assert(sqlContext.listener.stageIdToStageMetrics.size == previousStageNumber + 1)
@@ -398,7 +398,7 @@ class SQLListenerMemoryLeakSuite extends SparkFunSuite {
).toDF()
df.collect()
try {
- df.rdd.foreach(_ => throw new RuntimeException("Oops"))
+ df.foreach(_ => throw new RuntimeException("Oops"))
} catch {
case e: SparkException => // This is expected for a failed job
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 12a5542bd4..f141a9bd0f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -210,7 +210,7 @@ object SparkSubmitClassLoaderTest extends Logging {
}
// Second, we load classes at the executor side.
logInfo("Testing load classes at the executor side.")
- val result = df.rdd.mapPartitions { x =>
+ val result = df.mapPartitions { x =>
var exception: String = null
try {
Utils.classForName(args(0))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 1002487447..3208ebc9ff 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -664,13 +664,11 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
test("implement identity function using case statement") {
val actual = sql("SELECT (CASE key WHEN key THEN key END) FROM src")
- .rdd
.map { case Row(i: Int) => i }
.collect()
.toSet
val expected = sql("SELECT key FROM src")
- .rdd
.map { case Row(i: Int) => i }
.collect()
.toSet
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 68249517f5..b11d1d9de0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -119,7 +119,6 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
// expr = (not leaf-0)
assertResult(10) {
sql("SELECT name, contacts FROM t where age > 5")
- .rdd
.flatMap(_.getAs[Seq[_]]("contacts"))
.count()
}
@@ -132,7 +131,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
val df = sql("SELECT name, contacts FROM t WHERE age > 5 AND age < 8")
assert(df.count() === 2)
assertResult(4) {
- df.rdd.flatMap(_.getAs[Seq[_]]("contacts")).count()
+ df.flatMap(_.getAs[Seq[_]]("contacts")).count()
}
}
@@ -144,7 +143,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
val df = sql("SELECT name, contacts FROM t WHERE age < 2 OR age > 8")
assert(df.count() === 3)
assertResult(6) {
- df.rdd.flatMap(_.getAs[Seq[_]]("contacts")).count()
+ df.flatMap(_.getAs[Seq[_]]("contacts")).count()
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index a127cf6e4b..68d5c7da1f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -854,7 +854,7 @@ abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils with
test(s"hive udfs $table") {
checkAnswer(
sql(s"SELECT concat(stringField, stringField) FROM $table"),
- sql(s"SELECT stringField FROM $table").rdd.map {
+ sql(s"SELECT stringField FROM $table").map {
case Row(s: String) => Row(s + s)
}.collect().toSeq)
}