aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2016-03-16 11:20:15 -0700
committerReynold Xin <rxin@databricks.com>2016-03-16 11:20:15 -0700
commitd9670f84739b0840501b19b8cb0e851850edb8c1 (patch)
treea664b7f0ca4e64b025408d770412dd8be8c07422
parentd9e8f26d0334f393e3b02d7a3b607be54a2a5efe (diff)
downloadspark-d9670f84739b0840501b19b8cb0e851850edb8c1.tar.gz
spark-d9670f84739b0840501b19b8cb0e851850edb8c1.tar.bz2
spark-d9670f84739b0840501b19b8cb0e851850edb8c1.zip
[SPARK-13894][SQL] SqlContext.range return type from DataFrame to DataSet
## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-13894 Change the return type of the `SQLContext.range` API from `DataFrame` to `Dataset`. ## How was this patch tested? No additional unit test required. Author: Cheng Hao <hao.cheng@intel.com> Closes #11730 from chenghao-intel/range.
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala33
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala2
12 files changed, 38 insertions, 37 deletions
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
index b9533f881d..d40e69dced 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
@@ -114,7 +114,7 @@ class StringIndexerSuite
val indexerModel = new StringIndexerModel("indexer", Array("a", "b", "c"))
.setInputCol("label")
.setOutputCol("labelIndex")
- val df = sqlContext.range(0L, 10L)
+ val df = sqlContext.range(0L, 10L).toDF()
assert(indexerModel.transform(df).eq(df))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 177d78c4c0..e4d9308692 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -60,6 +60,7 @@ import org.apache.spark.util.Utils
* @groupname specificdata Specific Data Sources
* @groupname config Configuration
* @groupname dataframes Custom DataFrame Creation
+ * @groupname dataset Custom DataFrame Creation
* @groupname Ungrouped Support functions for language integrated queries
* @since 1.0.0
*/
@@ -716,53 +717,53 @@ class SQLContext private[sql](
/**
* :: Experimental ::
- * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements
+ * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
* in an range from 0 to `end` (exclusive) with step value 1.
*
- * @since 1.4.1
- * @group dataframe
+ * @since 2.0.0
+ * @group dataset
*/
@Experimental
- def range(end: Long): DataFrame = range(0, end)
+ def range(end: Long): Dataset[Long] = range(0, end)
/**
* :: Experimental ::
- * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements
+ * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
* in an range from `start` to `end` (exclusive) with step value 1.
*
- * @since 1.4.0
- * @group dataframe
+ * @since 2.0.0
+ * @group dataset
*/
@Experimental
- def range(start: Long, end: Long): DataFrame = {
+ def range(start: Long, end: Long): Dataset[Long] = {
range(start, end, step = 1, numPartitions = sparkContext.defaultParallelism)
}
/**
* :: Experimental ::
- * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements
+ * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
* in an range from `start` to `end` (exclusive) with an step value.
*
* @since 2.0.0
- * @group dataframe
+ * @group dataset
*/
@Experimental
- def range(start: Long, end: Long, step: Long): DataFrame = {
+ def range(start: Long, end: Long, step: Long): Dataset[Long] = {
range(start, end, step, numPartitions = sparkContext.defaultParallelism)
}
/**
* :: Experimental ::
- * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements
+ * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
* in an range from `start` to `end` (exclusive) with an step value, with partition number
* specified.
*
- * @since 1.4.0
- * @group dataframe
+ * @since 2.0.0
+ * @group dataset
*/
@Experimental
- def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame = {
- Dataset.newDataFrame(this, Range(start, end, step, numPartitions))
+ def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long] = {
+ new Dataset(this, Range(start, end, step, numPartitions), implicits.newLongEncoder)
}
/**
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index 7fe17e0cf7..f3c5a86e20 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -328,7 +328,7 @@ public class JavaDataFrameSuite {
@Test
public void testCountMinSketch() {
- Dataset<Row> df = context.range(1000);
+ Dataset df = context.range(1000);
CountMinSketch sketch1 = df.stat().countMinSketch("id", 10, 20, 42);
Assert.assertEquals(sketch1.totalCount(), 1000);
@@ -353,7 +353,7 @@ public class JavaDataFrameSuite {
@Test
public void testBloomFilter() {
- Dataset<Row> df = context.range(1000);
+ Dataset df = context.range(1000);
BloomFilter filter1 = df.stat().bloomFilter("id", 1000, 0.03);
Assert.assertTrue(filter1.expectedFpp() - 0.03 < 1e-3);
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 2333fa27ca..199e138abf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -344,7 +344,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
// SPARK-12340: overstep the bounds of Int in SparkPlan.executeTake
checkAnswer(
- sqlContext.range(2).limit(2147483638),
+ sqlContext.range(2).toDF().limit(2147483638),
Row(0) :: Row(1) :: Nil
)
}
@@ -1312,7 +1312,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
test("reuse exchange") {
withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "2") {
- val df = sqlContext.range(100)
+ val df = sqlContext.range(100).toDF()
val join = df.join(df, "id")
val plan = join.queryExecution.executedPlan
checkAnswer(join, df)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 836fb1ce85..3efe984c09 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1718,7 +1718,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("run sql directly on files") {
- val df = sqlContext.range(100)
+ val df = sqlContext.range(100).toDF()
withTempPath(f => {
df.write.json(f.getCanonicalPath)
checkAnswer(sql(s"select id from json.`${f.getCanonicalPath}`"),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index e00c762c67..716c367eae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -64,7 +64,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
}
test("Sort should be included in WholeStageCodegen") {
- val df = sqlContext.range(3, 0, -1).sort(col("id"))
+ val df = sqlContext.range(3, 0, -1).toDF().sort(col("id"))
val plan = df.queryExecution.executedPlan
assert(plan.find(p =>
p.isInstanceOf[WholeStageCodegen] &&
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index b7834d76cc..4d9a8d7eb1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -599,11 +599,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
test("null and non-null strings") {
// Create a dataset where the first values are NULL and then some non-null values. The
// number of non-nulls needs to be bigger than the ParquetReader batch size.
- val data = sqlContext.range(200).rdd.map { i =>
- if (i.getLong(0) < 150) Row(None)
- else Row("a")
- }
- val df = sqlContext.createDataFrame(data, StructType(StructField("col", StringType) :: Nil))
+ val data: Dataset[String] = sqlContext.range(200).map (i =>
+ if (i < 150) null
+ else "a"
+ )
+ val df = data.toDF("col")
assert(df.agg("col" -> "count").collect().head.getLong(0) == 50)
withTempPath { dir =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index d7bd215af5..988852a4fc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -128,8 +128,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
// Assume the execution plan is
// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1))
// TODO: update metrics in generated operators
- val df = sqlContext.range(10).filter('id < 5)
- testSparkPlanMetrics(df, 1, Map.empty)
+ val ds = sqlContext.range(10).filter('id < 5)
+ testSparkPlanMetrics(ds.toDF(), 1, Map.empty)
}
test("TungstenAggregate metrics") {
@@ -157,8 +157,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
test("Sort metrics") {
// Assume the execution plan is
// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1))
- val df = sqlContext.range(10).sort('id)
- testSparkPlanMetrics(df, 2, Map.empty)
+ val ds = sqlContext.range(10).sort('id)
+ testSparkPlanMetrics(ds.toDF(), 2, Map.empty)
}
test("SortMergeJoin metrics") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index e2effef0b9..d275190744 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
- private lazy val df = sqlContext.range(10).coalesce(1)
+ private lazy val df = sqlContext.range(10).coalesce(1).toDF()
private def checkTablePath(dbName: String, tableName: String): Unit = {
val metastoreTable = hiveContext.sessionState.catalog.client.getTable(dbName, tableName)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index d6c10d6ed9..9667b53e48 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1425,7 +1425,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
test("run sql directly on files") {
- val df = sqlContext.range(100)
+ val df = sqlContext.range(100).toDF()
withTempPath(f => {
df.write.parquet(f.getCanonicalPath)
checkAnswer(sql(s"select id from parquet.`${f.getCanonicalPath}`"),
@@ -1582,7 +1582,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
withView("v") {
sql("CREATE VIEW v AS SELECT * FROM add_col")
sqlContext.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col")
- checkAnswer(sql("SELECT * FROM v"), sqlContext.range(10))
+ checkAnswer(sql("SELECT * FROM v"), sqlContext.range(10).toDF())
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index 8856148a95..1e5dbd991e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -126,7 +126,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
test("SPARK-8604: Parquet data source should write summary file while doing appending") {
withTempPath { dir =>
val path = dir.getCanonicalPath
- val df = sqlContext.range(0, 5)
+ val df = sqlContext.range(0, 5).toDF()
df.write.mode(SaveMode.Overwrite).parquet(path)
val summaryPath = new Path(path, "_metadata")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 7e09616380..7e5506ee4a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -673,7 +673,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
classOf[AlwaysFailOutputCommitter].getName)
// Code below shouldn't throw since customized output committer should be disabled.
- val df = sqlContext.range(10).coalesce(1)
+ val df = sqlContext.range(10).toDF().coalesce(1)
df.write.format(dataSourceName).save(dir.getCanonicalPath)
checkAnswer(
sqlContext