diff options
Diffstat (limited to 'sql/core')
7 files changed, 32 insertions, 31 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 177d78c4c0..e4d9308692 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -60,6 +60,7 @@ import org.apache.spark.util.Utils * @groupname specificdata Specific Data Sources * @groupname config Configuration * @groupname dataframes Custom DataFrame Creation + * @groupname dataset Custom DataFrame Creation * @groupname Ungrouped Support functions for language integrated queries * @since 1.0.0 */ @@ -716,53 +717,53 @@ class SQLContext private[sql]( /** * :: Experimental :: - * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements + * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements * in an range from 0 to `end` (exclusive) with step value 1. * - * @since 1.4.1 - * @group dataframe + * @since 2.0.0 + * @group dataset */ @Experimental - def range(end: Long): DataFrame = range(0, end) + def range(end: Long): Dataset[Long] = range(0, end) /** * :: Experimental :: - * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements + * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements * in an range from `start` to `end` (exclusive) with step value 1. * - * @since 1.4.0 - * @group dataframe + * @since 2.0.0 + * @group dataset */ @Experimental - def range(start: Long, end: Long): DataFrame = { + def range(start: Long, end: Long): Dataset[Long] = { range(start, end, step = 1, numPartitions = sparkContext.defaultParallelism) } /** * :: Experimental :: - * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements + * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements * in an range from `start` to `end` (exclusive) with an step value. * * @since 2.0.0 - * @group dataframe + * @group dataset */ @Experimental - def range(start: Long, end: Long, step: Long): DataFrame = { + def range(start: Long, end: Long, step: Long): Dataset[Long] = { range(start, end, step, numPartitions = sparkContext.defaultParallelism) } /** * :: Experimental :: - * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements + * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements * in an range from `start` to `end` (exclusive) with an step value, with partition number * specified. * - * @since 1.4.0 - * @group dataframe + * @since 2.0.0 + * @group dataset */ @Experimental - def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame = { - Dataset.newDataFrame(this, Range(start, end, step, numPartitions)) + def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long] = { + new Dataset(this, Range(start, end, step, numPartitions), implicits.newLongEncoder) } /** diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index 7fe17e0cf7..f3c5a86e20 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -328,7 +328,7 @@ public class JavaDataFrameSuite { @Test public void testCountMinSketch() { - Dataset<Row> df = context.range(1000); + Dataset df = context.range(1000); CountMinSketch sketch1 = df.stat().countMinSketch("id", 10, 20, 42); Assert.assertEquals(sketch1.totalCount(), 1000); @@ -353,7 +353,7 @@ public class JavaDataFrameSuite { @Test public void testBloomFilter() { - Dataset<Row> df = context.range(1000); + Dataset df = context.range(1000); BloomFilter filter1 = df.stat().bloomFilter("id", 1000, 0.03); Assert.assertTrue(filter1.expectedFpp() - 0.03 < 1e-3); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 2333fa27ca..199e138abf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -344,7 +344,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { // SPARK-12340: overstep the bounds of Int in SparkPlan.executeTake checkAnswer( - sqlContext.range(2).limit(2147483638), + sqlContext.range(2).toDF().limit(2147483638), Row(0) :: Row(1) :: Nil ) } @@ -1312,7 +1312,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("reuse exchange") { withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "2") { - val df = sqlContext.range(100) + val df = sqlContext.range(100).toDF() val join = df.join(df, "id") val plan = join.queryExecution.executedPlan checkAnswer(join, df) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 836fb1ce85..3efe984c09 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1718,7 +1718,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("run sql directly on files") { - val df = sqlContext.range(100) + val df = sqlContext.range(100).toDF() withTempPath(f => { df.write.json(f.getCanonicalPath) checkAnswer(sql(s"select id from json.`${f.getCanonicalPath}`"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index e00c762c67..716c367eae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -64,7 +64,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { } test("Sort should be included in WholeStageCodegen") { - val df = sqlContext.range(3, 0, -1).sort(col("id")) + val df = sqlContext.range(3, 0, -1).toDF().sort(col("id")) val plan = df.queryExecution.executedPlan assert(plan.find(p => p.isInstanceOf[WholeStageCodegen] && diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index b7834d76cc..4d9a8d7eb1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -599,11 +599,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { test("null and non-null strings") { // Create a dataset where the first values are NULL and then some non-null values. The // number of non-nulls needs to be bigger than the ParquetReader batch size. - val data = sqlContext.range(200).rdd.map { i => - if (i.getLong(0) < 150) Row(None) - else Row("a") - } - val df = sqlContext.createDataFrame(data, StructType(StructField("col", StringType) :: Nil)) + val data: Dataset[String] = sqlContext.range(200).map (i => + if (i < 150) null + else "a" + ) + val df = data.toDF("col") assert(df.agg("col" -> "count").collect().head.getLong(0) == 50) withTempPath { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index d7bd215af5..988852a4fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -128,8 +128,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { // Assume the execution plan is // WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1)) // TODO: update metrics in generated operators - val df = sqlContext.range(10).filter('id < 5) - testSparkPlanMetrics(df, 1, Map.empty) + val ds = sqlContext.range(10).filter('id < 5) + testSparkPlanMetrics(ds.toDF(), 1, Map.empty) } test("TungstenAggregate metrics") { @@ -157,8 +157,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { test("Sort metrics") { // Assume the execution plan is // WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1)) - val df = sqlContext.range(10).sort('id) - testSparkPlanMetrics(df, 2, Map.empty) + val ds = sqlContext.range(10).sort('id) + testSparkPlanMetrics(ds.toDF(), 2, Map.empty) } test("SortMergeJoin metrics") { |