diff options
Diffstat (limited to 'sql')
11 files changed, 37 insertions, 36 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 177d78c4c0..e4d9308692 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -60,6 +60,7 @@ import org.apache.spark.util.Utils * @groupname specificdata Specific Data Sources * @groupname config Configuration * @groupname dataframes Custom DataFrame Creation + * @groupname dataset Custom DataFrame Creation * @groupname Ungrouped Support functions for language integrated queries * @since 1.0.0 */ @@ -716,53 +717,53 @@ class SQLContext private[sql]( /** * :: Experimental :: - * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements + * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements * in an range from 0 to `end` (exclusive) with step value 1. * - * @since 1.4.1 - * @group dataframe + * @since 2.0.0 + * @group dataset */ @Experimental - def range(end: Long): DataFrame = range(0, end) + def range(end: Long): Dataset[Long] = range(0, end) /** * :: Experimental :: - * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements + * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements * in an range from `start` to `end` (exclusive) with step value 1. * - * @since 1.4.0 - * @group dataframe + * @since 2.0.0 + * @group dataset */ @Experimental - def range(start: Long, end: Long): DataFrame = { + def range(start: Long, end: Long): Dataset[Long] = { range(start, end, step = 1, numPartitions = sparkContext.defaultParallelism) } /** * :: Experimental :: - * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements + * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements * in an range from `start` to `end` (exclusive) with an step value. * * @since 2.0.0 - * @group dataframe + * @group dataset */ @Experimental - def range(start: Long, end: Long, step: Long): DataFrame = { + def range(start: Long, end: Long, step: Long): Dataset[Long] = { range(start, end, step, numPartitions = sparkContext.defaultParallelism) } /** * :: Experimental :: - * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements + * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements * in an range from `start` to `end` (exclusive) with an step value, with partition number * specified. * - * @since 1.4.0 - * @group dataframe + * @since 2.0.0 + * @group dataset */ @Experimental - def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame = { - Dataset.newDataFrame(this, Range(start, end, step, numPartitions)) + def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long] = { + new Dataset(this, Range(start, end, step, numPartitions), implicits.newLongEncoder) } /** diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index 7fe17e0cf7..f3c5a86e20 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -328,7 +328,7 @@ public class JavaDataFrameSuite { @Test public void testCountMinSketch() { - Dataset<Row> df = context.range(1000); + Dataset df = context.range(1000); CountMinSketch sketch1 = df.stat().countMinSketch("id", 10, 20, 42); Assert.assertEquals(sketch1.totalCount(), 1000); @@ -353,7 +353,7 @@ public class JavaDataFrameSuite { @Test public void testBloomFilter() { - Dataset<Row> df = context.range(1000); + Dataset df = context.range(1000); BloomFilter filter1 = df.stat().bloomFilter("id", 1000, 0.03); Assert.assertTrue(filter1.expectedFpp() - 0.03 < 1e-3); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 2333fa27ca..199e138abf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -344,7 +344,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { // SPARK-12340: overstep the bounds of Int in SparkPlan.executeTake checkAnswer( - sqlContext.range(2).limit(2147483638), + sqlContext.range(2).toDF().limit(2147483638), Row(0) :: Row(1) :: Nil ) } @@ -1312,7 +1312,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("reuse exchange") { withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "2") { - val df = sqlContext.range(100) + val df = sqlContext.range(100).toDF() val join = df.join(df, "id") val plan = join.queryExecution.executedPlan checkAnswer(join, df) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 836fb1ce85..3efe984c09 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1718,7 +1718,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("run sql directly on files") { - val df = sqlContext.range(100) + val df = sqlContext.range(100).toDF() withTempPath(f => { df.write.json(f.getCanonicalPath) checkAnswer(sql(s"select id from json.`${f.getCanonicalPath}`"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index e00c762c67..716c367eae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -64,7 +64,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { } test("Sort should be included in WholeStageCodegen") { - val df = sqlContext.range(3, 0, -1).sort(col("id")) + val df = sqlContext.range(3, 0, -1).toDF().sort(col("id")) val plan = df.queryExecution.executedPlan assert(plan.find(p => p.isInstanceOf[WholeStageCodegen] && diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index b7834d76cc..4d9a8d7eb1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -599,11 +599,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { test("null and non-null strings") { // Create a dataset where the first values are NULL and then some non-null values. The // number of non-nulls needs to be bigger than the ParquetReader batch size. - val data = sqlContext.range(200).rdd.map { i => - if (i.getLong(0) < 150) Row(None) - else Row("a") - } - val df = sqlContext.createDataFrame(data, StructType(StructField("col", StringType) :: Nil)) + val data: Dataset[String] = sqlContext.range(200).map (i => + if (i < 150) null + else "a" + ) + val df = data.toDF("col") assert(df.agg("col" -> "count").collect().head.getLong(0) == 50) withTempPath { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index d7bd215af5..988852a4fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -128,8 +128,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { // Assume the execution plan is // WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1)) // TODO: update metrics in generated operators - val df = sqlContext.range(10).filter('id < 5) - testSparkPlanMetrics(df, 1, Map.empty) + val ds = sqlContext.range(10).filter('id < 5) + testSparkPlanMetrics(ds.toDF(), 1, Map.empty) } test("TungstenAggregate metrics") { @@ -157,8 +157,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { test("Sort metrics") { // Assume the execution plan is // WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1)) - val df = sqlContext.range(10).sort('id) - testSparkPlanMetrics(df, 2, Map.empty) + val ds = sqlContext.range(10).sort('id) + testSparkPlanMetrics(ds.toDF(), 2, Map.empty) } test("SortMergeJoin metrics") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index e2effef0b9..d275190744 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { - private lazy val df = sqlContext.range(10).coalesce(1) + private lazy val df = sqlContext.range(10).coalesce(1).toDF() private def checkTablePath(dbName: String, tableName: String): Unit = { val metastoreTable = hiveContext.sessionState.catalog.client.getTable(dbName, tableName) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index d6c10d6ed9..9667b53e48 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1425,7 +1425,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } test("run sql directly on files") { - val df = sqlContext.range(100) + val df = sqlContext.range(100).toDF() withTempPath(f => { df.write.parquet(f.getCanonicalPath) checkAnswer(sql(s"select id from parquet.`${f.getCanonicalPath}`"), @@ -1582,7 +1582,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { withView("v") { sql("CREATE VIEW v AS SELECT * FROM add_col") sqlContext.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col") - checkAnswer(sql("SELECT * FROM v"), sqlContext.range(10)) + checkAnswer(sql("SELECT * FROM v"), sqlContext.range(10).toDF()) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index 8856148a95..1e5dbd991e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -126,7 +126,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { test("SPARK-8604: Parquet data source should write summary file while doing appending") { withTempPath { dir => val path = dir.getCanonicalPath - val df = sqlContext.range(0, 5) + val df = sqlContext.range(0, 5).toDF() df.write.mode(SaveMode.Overwrite).parquet(path) val summaryPath = new Path(path, "_metadata") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 7e09616380..7e5506ee4a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -673,7 +673,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes classOf[AlwaysFailOutputCommitter].getName) // Code below shouldn't throw since customized output committer should be disabled. - val df = sqlContext.range(10).coalesce(1) + val df = sqlContext.range(10).toDF().coalesce(1) df.write.format(dataSourceName).save(dir.getCanonicalPath) checkAnswer( sqlContext |