aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2016-03-16 11:20:15 -0700
committerReynold Xin <rxin@databricks.com>2016-03-16 11:20:15 -0700
commitd9670f84739b0840501b19b8cb0e851850edb8c1 (patch)
treea664b7f0ca4e64b025408d770412dd8be8c07422 /sql/core
parentd9e8f26d0334f393e3b02d7a3b607be54a2a5efe (diff)
downloadspark-d9670f84739b0840501b19b8cb0e851850edb8c1.tar.gz
spark-d9670f84739b0840501b19b8cb0e851850edb8c1.tar.bz2
spark-d9670f84739b0840501b19b8cb0e851850edb8c1.zip
[SPARK-13894][SQL] SqlContext.range return type from DataFrame to DataSet
## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-13894 Change the return type of the `SQLContext.range` API from `DataFrame` to `Dataset`. ## How was this patch tested? No additional unit test required. Author: Cheng Hao <hao.cheng@intel.com> Closes #11730 from chenghao-intel/range.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala33
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala8
7 files changed, 32 insertions, 31 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 177d78c4c0..e4d9308692 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -60,6 +60,7 @@ import org.apache.spark.util.Utils
* @groupname specificdata Specific Data Sources
* @groupname config Configuration
* @groupname dataframes Custom DataFrame Creation
+ * @groupname dataset Custom DataFrame Creation
* @groupname Ungrouped Support functions for language integrated queries
* @since 1.0.0
*/
@@ -716,53 +717,53 @@ class SQLContext private[sql](
/**
* :: Experimental ::
- * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements
+ * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
* in an range from 0 to `end` (exclusive) with step value 1.
*
- * @since 1.4.1
- * @group dataframe
+ * @since 2.0.0
+ * @group dataset
*/
@Experimental
- def range(end: Long): DataFrame = range(0, end)
+ def range(end: Long): Dataset[Long] = range(0, end)
/**
* :: Experimental ::
- * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements
+ * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
* in an range from `start` to `end` (exclusive) with step value 1.
*
- * @since 1.4.0
- * @group dataframe
+ * @since 2.0.0
+ * @group dataset
*/
@Experimental
- def range(start: Long, end: Long): DataFrame = {
+ def range(start: Long, end: Long): Dataset[Long] = {
range(start, end, step = 1, numPartitions = sparkContext.defaultParallelism)
}
/**
* :: Experimental ::
- * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements
+ * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
* in an range from `start` to `end` (exclusive) with an step value.
*
* @since 2.0.0
- * @group dataframe
+ * @group dataset
*/
@Experimental
- def range(start: Long, end: Long, step: Long): DataFrame = {
+ def range(start: Long, end: Long, step: Long): Dataset[Long] = {
range(start, end, step, numPartitions = sparkContext.defaultParallelism)
}
/**
* :: Experimental ::
- * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements
+ * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
* in an range from `start` to `end` (exclusive) with an step value, with partition number
* specified.
*
- * @since 1.4.0
- * @group dataframe
+ * @since 2.0.0
+ * @group dataset
*/
@Experimental
- def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame = {
- Dataset.newDataFrame(this, Range(start, end, step, numPartitions))
+ def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long] = {
+ new Dataset(this, Range(start, end, step, numPartitions), implicits.newLongEncoder)
}
/**
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index 7fe17e0cf7..f3c5a86e20 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -328,7 +328,7 @@ public class JavaDataFrameSuite {
@Test
public void testCountMinSketch() {
- Dataset<Row> df = context.range(1000);
+ Dataset df = context.range(1000);
CountMinSketch sketch1 = df.stat().countMinSketch("id", 10, 20, 42);
Assert.assertEquals(sketch1.totalCount(), 1000);
@@ -353,7 +353,7 @@ public class JavaDataFrameSuite {
@Test
public void testBloomFilter() {
- Dataset<Row> df = context.range(1000);
+ Dataset df = context.range(1000);
BloomFilter filter1 = df.stat().bloomFilter("id", 1000, 0.03);
Assert.assertTrue(filter1.expectedFpp() - 0.03 < 1e-3);
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 2333fa27ca..199e138abf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -344,7 +344,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
// SPARK-12340: overstep the bounds of Int in SparkPlan.executeTake
checkAnswer(
- sqlContext.range(2).limit(2147483638),
+ sqlContext.range(2).toDF().limit(2147483638),
Row(0) :: Row(1) :: Nil
)
}
@@ -1312,7 +1312,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
test("reuse exchange") {
withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "2") {
- val df = sqlContext.range(100)
+ val df = sqlContext.range(100).toDF()
val join = df.join(df, "id")
val plan = join.queryExecution.executedPlan
checkAnswer(join, df)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 836fb1ce85..3efe984c09 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1718,7 +1718,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("run sql directly on files") {
- val df = sqlContext.range(100)
+ val df = sqlContext.range(100).toDF()
withTempPath(f => {
df.write.json(f.getCanonicalPath)
checkAnswer(sql(s"select id from json.`${f.getCanonicalPath}`"),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index e00c762c67..716c367eae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -64,7 +64,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
}
test("Sort should be included in WholeStageCodegen") {
- val df = sqlContext.range(3, 0, -1).sort(col("id"))
+ val df = sqlContext.range(3, 0, -1).toDF().sort(col("id"))
val plan = df.queryExecution.executedPlan
assert(plan.find(p =>
p.isInstanceOf[WholeStageCodegen] &&
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index b7834d76cc..4d9a8d7eb1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -599,11 +599,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
test("null and non-null strings") {
// Create a dataset where the first values are NULL and then some non-null values. The
// number of non-nulls needs to be bigger than the ParquetReader batch size.
- val data = sqlContext.range(200).rdd.map { i =>
- if (i.getLong(0) < 150) Row(None)
- else Row("a")
- }
- val df = sqlContext.createDataFrame(data, StructType(StructField("col", StringType) :: Nil))
+ val data: Dataset[String] = sqlContext.range(200).map (i =>
+ if (i < 150) null
+ else "a"
+ )
+ val df = data.toDF("col")
assert(df.agg("col" -> "count").collect().head.getLong(0) == 50)
withTempPath { dir =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index d7bd215af5..988852a4fc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -128,8 +128,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
// Assume the execution plan is
// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1))
// TODO: update metrics in generated operators
- val df = sqlContext.range(10).filter('id < 5)
- testSparkPlanMetrics(df, 1, Map.empty)
+ val ds = sqlContext.range(10).filter('id < 5)
+ testSparkPlanMetrics(ds.toDF(), 1, Map.empty)
}
test("TungstenAggregate metrics") {
@@ -157,8 +157,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
test("Sort metrics") {
// Assume the execution plan is
// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1))
- val df = sqlContext.range(10).sort('id)
- testSparkPlanMetrics(df, 2, Map.empty)
+ val ds = sqlContext.range(10).sort('id)
+ testSparkPlanMetrics(ds.toDF(), 2, Map.empty)
}
test("SortMergeJoin metrics") {