aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-02-13 23:03:22 -0800
committerReynold Xin <rxin@databricks.com>2015-02-13 23:03:22 -0800
commite98dfe627c5d0201464cdd0f363f391ea84c389a (patch)
tree794beea739eb04bf2e0926f9b0e19ffacb94ba08 /examples
parent0ce4e430a81532dc317136f968f28742e087d840 (diff)
downloadspark-e98dfe627c5d0201464cdd0f363f391ea84c389a.tar.gz
spark-e98dfe627c5d0201464cdd0f363f391ea84c389a.tar.bz2
spark-e98dfe627c5d0201464cdd0f363f391ea84c389a.zip
[SPARK-5752][SQL] Don't implicitly convert RDDs directly to DataFrames
- The old implicit would convert RDDs directly to DataFrames, and that added too many methods. - toDataFrame -> toDF - Dsl -> functions - implicits moved into SQLContext.implicits - addColumn -> withColumn - renameColumn -> withColumnRenamed Python changes: - toDataFrame -> toDF - Dsl -> functions package - addColumn -> withColumn - renameColumn -> withColumnRenamed - add toDF functions to RDD on SQLContext init - add flatMap to DataFrame Author: Reynold Xin <rxin@databricks.com> Author: Davies Liu <davies@databricks.com> Closes #4556 from rxin/SPARK-5752 and squashes the following commits: 5ef9910 [Reynold Xin] More fix 61d3fca [Reynold Xin] Merge branch 'df5' of github.com:davies/spark into SPARK-5752 ff5832c [Reynold Xin] Fix python 749c675 [Reynold Xin] count(*) fixes. 5806df0 [Reynold Xin] Fix build break again. d941f3d [Reynold Xin] Fixed explode compilation break. fe1267a [Davies Liu] flatMap c4afb8e [Reynold Xin] style d9de47f [Davies Liu] add comment b783994 [Davies Liu] add comment for toDF e2154e5 [Davies Liu] schema() -> schema 3a1004f [Davies Liu] Dsl -> functions, toDF() fb256af [Reynold Xin] - toDataFrame -> toDF - Dsl -> functions - implicits moved into SQLContext.implicits - addColumn -> withColumn - renameColumn -> withColumnRenamed 0dd74eb [Reynold Xin] [SPARK-5752][SQL] Don't implicitly convert RDDs directly to DataFrames 97dd47c [Davies Liu] fix mistake 6168f74 [Davies Liu] fix test 1fc0199 [Davies Liu] fix test a075cd5 [Davies Liu] clean up, toPandas 663d314 [Davies Liu] add test for agg('*') 9e214d5 [Reynold Xin] count(*) fixes. 1ed7136 [Reynold Xin] Fix build break again. 921b2e3 [Reynold Xin] Fixed explode compilation break. 14698d4 [Davies Liu] flatMap ba3e12d [Reynold Xin] style d08c92d [Davies Liu] add comment 5c8b524 [Davies Liu] add comment for toDF a4e5e66 [Davies Liu] schema() -> schema d377fc9 [Davies Liu] Dsl -> functions, toDF() 6b3086c [Reynold Xin] - toDataFrame -> toDF - Dsl -> functions - implicits moved into SQLContext.implicits - addColumn -> withColumn - renameColumn -> withColumnRenamed 807e8b1 [Reynold Xin] [SPARK-5752][SQL] Don't implicitly convert RDDs directly to DataFrames
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala8
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala10
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala2
8 files changed, 22 insertions, 22 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
index a2893f78e0..f0241943ef 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
@@ -90,7 +90,7 @@ object CrossValidatorExample {
crossval.setNumFolds(2) // Use 3+ in practice
// Run cross-validation, and choose the best set of parameters.
- val cvModel = crossval.fit(training)
+ val cvModel = crossval.fit(training.toDF)
// Prepare test documents, which are unlabeled.
val test = sc.parallelize(Seq(
@@ -100,7 +100,7 @@ object CrossValidatorExample {
Document(7L, "apache hadoop")))
// Make predictions on test documents. cvModel uses the best model found (lrModel).
- cvModel.transform(test)
+ cvModel.transform(test.toDF)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala
index aed4423893..54aadd2288 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala
@@ -58,7 +58,7 @@ object DeveloperApiExample {
lr.setMaxIter(10)
// Learn a LogisticRegression model. This uses the parameters stored in lr.
- val model = lr.fit(training)
+ val model = lr.fit(training.toDF)
// Prepare test data.
val test = sc.parallelize(Seq(
@@ -67,7 +67,7 @@ object DeveloperApiExample {
LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))))
// Make predictions on test data.
- val sumPredictions: Double = model.transform(test)
+ val sumPredictions: Double = model.transform(test.toDF)
.select("features", "label", "prediction")
.collect()
.map { case Row(features: Vector, label: Double, prediction: Double) =>
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
index 836ea2e012..adaf796dc1 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
@@ -137,9 +137,9 @@ object MovieLensALS {
.setRegParam(params.regParam)
.setNumBlocks(params.numBlocks)
- val model = als.fit(training)
+ val model = als.fit(training.toDF)
- val predictions = model.transform(test).cache()
+ val predictions = model.transform(test.toDF).cache()
// Evaluate the model.
// TODO: Create an evaluator to compute RMSE.
@@ -158,7 +158,7 @@ object MovieLensALS {
// Inspect false positives.
predictions.registerTempTable("prediction")
- sc.textFile(params.movies).map(Movie.parseMovie).registerTempTable("movie")
+ sc.textFile(params.movies).map(Movie.parseMovie).toDF.registerTempTable("movie")
sqlContext.sql(
"""
|SELECT userId, prediction.movieId, title, rating, prediction
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala
index 80c9f5ff57..c5bb5515b1 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala
@@ -58,7 +58,7 @@ object SimpleParamsExample {
.setRegParam(0.01)
// Learn a LogisticRegression model. This uses the parameters stored in lr.
- val model1 = lr.fit(training)
+ val model1 = lr.fit(training.toDF)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
@@ -77,7 +77,7 @@ object SimpleParamsExample {
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
- val model2 = lr.fit(training, paramMapCombined)
+ val model2 = lr.fit(training.toDF, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.fittingParamMap)
// Prepare test data.
@@ -90,7 +90,7 @@ object SimpleParamsExample {
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
- model2.transform(test)
+ model2.transform(test.toDF)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala
index 968cb29212..8b47f88e48 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala
@@ -69,7 +69,7 @@ object SimpleTextClassificationPipeline {
.setStages(Array(tokenizer, hashingTF, lr))
// Fit the pipeline to training documents.
- val model = pipeline.fit(training)
+ val model = pipeline.fit(training.toDF)
// Prepare test documents, which are unlabeled.
val test = sc.parallelize(Seq(
@@ -79,7 +79,7 @@ object SimpleTextClassificationPipeline {
Document(7L, "apache hadoop")))
// Make predictions on test documents.
- model.transform(test)
+ model.transform(test.toDF)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala
index 89b6255991..c98c68a02f 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala
@@ -81,18 +81,18 @@ object DatasetExample {
println(s"Loaded ${origData.count()} instances from file: ${params.input}")
// Convert input data to DataFrame explicitly.
- val df: DataFrame = origData.toDataFrame
+ val df: DataFrame = origData.toDF
println(s"Inferred schema:\n${df.schema.prettyJson}")
println(s"Converted to DataFrame with ${df.count()} records")
- // Select columns, using implicit conversion to DataFrames.
- val labelsDf: DataFrame = origData.select("label")
+ // Select columns
+ val labelsDf: DataFrame = df.select("label")
val labels: RDD[Double] = labelsDf.map { case Row(v: Double) => v }
val numLabels = labels.count()
val meanLabel = labels.fold(0.0)(_ + _) / numLabels
println(s"Selected label column with average value $meanLabel")
- val featuresDf: DataFrame = origData.select("features")
+ val featuresDf: DataFrame = df.select("features")
val features: RDD[Vector] = featuresDf.map { case Row(v: Vector) => v }
val featureSummary = features.aggregate(new MultivariateOnlineSummarizer())(
(summary, feat) => summary.add(feat),
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
index 1eac3c8d03..79d3d5a24c 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
@@ -19,7 +19,7 @@ package org.apache.spark.examples.sql
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.Dsl._
+import org.apache.spark.sql.functions._
// One method for defining the schema of an RDD is to make a case class with the desired column
// names and types.
@@ -34,10 +34,10 @@ object RDDRelation {
// Importing the SQL context gives access to all the SQL functions and implicit conversions.
import sqlContext.implicits._
- val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
+ val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF
// Any RDD containing case classes can be registered as a table. The schema of the table is
// automatically inferred using scala reflection.
- rdd.registerTempTable("records")
+ df.registerTempTable("records")
// Once tables have been registered, you can run SQL queries over them.
println("Result of SELECT *:")
@@ -55,10 +55,10 @@ object RDDRelation {
rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println)
// Queries can also be written using a LINQ-like Scala DSL.
- rdd.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println)
+ df.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println)
// Write out an RDD as a parquet file.
- rdd.saveAsParquetFile("pair.parquet")
+ df.saveAsParquetFile("pair.parquet")
// Read in parquet file. Parquet files are self-describing so the schmema is preserved.
val parquetFile = sqlContext.parquetFile("pair.parquet")
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
index 15754cdfcc..7128deba54 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
@@ -68,7 +68,7 @@ object HiveFromSpark {
// You can also register RDDs as temporary tables within a HiveContext.
val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
- rdd.registerTempTable("records")
+ rdd.toDF.registerTempTable("records")
// Queries can then join RDD data with data stored in Hive.
println("Result of SELECT *:")