From cde086cb2a9a85406fc18d8e63e46425f614c15f Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 24 Mar 2016 00:42:13 +0800 Subject: [SPARK-13817][SQL][MINOR] Renames Dataset.newDataFrame to Dataset.ofRows ## What changes were proposed in this pull request? This PR does the renaming as suggested by marmbrus in [this comment][1]. ## How was this patch tested? Existing tests. [1]: https://github.com/apache/spark/commit/6d37e1eb90054cdb6323b75fb202f78ece604b15#commitcomment-16654694 Author: Cheng Lian Closes #11889 from liancheng/spark-13817-follow-up. --- .../org/apache/spark/sql/DataFrameReader.scala | 8 +++---- .../main/scala/org/apache/spark/sql/Dataset.scala | 4 ++-- .../apache/spark/sql/KeyValueGroupedDataset.scala | 2 +- .../spark/sql/RelationalGroupedDataset.scala | 8 +++---- .../scala/org/apache/spark/sql/SQLContext.scala | 26 +++++++++++----------- .../spark/sql/execution/command/commands.scala | 2 +- .../sql/execution/datasources/DataSource.scala | 2 +- .../datasources/InsertIntoDataSource.scala | 2 +- .../datasources/InsertIntoHadoopFsRelation.scala | 2 +- .../spark/sql/execution/datasources/ddl.scala | 8 +++---- .../spark/sql/execution/stat/FrequentItems.scala | 2 +- .../spark/sql/execution/stat/StatFunctions.scala | 2 +- .../sql/execution/streaming/StreamExecution.scala | 2 +- .../spark/sql/execution/streaming/memory.scala | 2 +- .../scala/org/apache/spark/sql/functions.scala | 2 +- .../org/apache/spark/sql/DataFrameSuite.scala | 2 +- .../scala/org/apache/spark/sql/StreamTest.scala | 2 +- .../datasources/FileSourceStrategySuite.scala | 2 +- .../org/apache/spark/sql/test/SQLTestUtils.scala | 2 +- .../apache/spark/sql/hive/execution/commands.scala | 2 +- .../org/apache/spark/sql/hive/SQLBuilderTest.scala | 2 +- .../sql/hive/execution/AggregationQuerySuite.scala | 2 +- 22 files changed, 44 insertions(+), 44 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 1d4693f54f..704535adaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -129,7 +129,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { userSpecifiedSchema = userSpecifiedSchema, className = source, options = extraOptions.toMap) - Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())) + Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())) } /** @@ -176,7 +176,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { userSpecifiedSchema = userSpecifiedSchema, className = source, options = extraOptions.toMap) - Dataset.newDataFrame(sqlContext, StreamingRelation(dataSource.createSource())) + Dataset.ofRows(sqlContext, StreamingRelation(dataSource.createSource())) } /** @@ -376,7 +376,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { parsedOptions) } - Dataset.newDataFrame( + Dataset.ofRows( sqlContext, LogicalRDD( schema.toAttributes, @@ -424,7 +424,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * @since 1.4.0 */ def table(tableName: String): DataFrame = { - Dataset.newDataFrame(sqlContext, + Dataset.ofRows(sqlContext, sqlContext.sessionState.catalog.lookupRelation( sqlContext.sessionState.sqlParser.parseTableIdentifier(tableName))) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 31864d63ab..ec0b3c78ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -53,7 +53,7 @@ private[sql] object Dataset { new Dataset(sqlContext, logicalPlan, implicitly[Encoder[T]]) } - def newDataFrame(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = { + def ofRows(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = { val qe = sqlContext.executePlan(logicalPlan) qe.assertAnalyzed() new Dataset[Row](sqlContext, logicalPlan, RowEncoder(qe.analyzed.schema)) @@ -2322,7 +2322,7 @@ class Dataset[T] private[sql]( /** A convenient function to wrap a logical plan and produce a DataFrame. */ @inline private def withPlan(logicalPlan: => LogicalPlan): DataFrame = { - Dataset.newDataFrame(sqlContext, logicalPlan) + Dataset.ofRows(sqlContext, logicalPlan) } /** A convenient function to wrap a logical plan and produce a Dataset. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index 8bb75bf2bf..07aa1515f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -59,7 +59,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( private def groupedData = { new RelationalGroupedDataset( - Dataset.newDataFrame(sqlContext, logicalPlan), + Dataset.ofRows(sqlContext, logicalPlan), groupingAttributes, RelationalGroupedDataset.GroupByType) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index 521032a8b3..91c02053ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -52,17 +52,17 @@ class RelationalGroupedDataset protected[sql]( groupType match { case RelationalGroupedDataset.GroupByType => - Dataset.newDataFrame( + Dataset.ofRows( df.sqlContext, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan)) case RelationalGroupedDataset.RollupType => - Dataset.newDataFrame( + Dataset.ofRows( df.sqlContext, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan)) case RelationalGroupedDataset.CubeType => - Dataset.newDataFrame( + Dataset.ofRows( df.sqlContext, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan)) case RelationalGroupedDataset.PivotType(pivotCol, values) => val aliasedGrps = groupingExprs.map(alias) - Dataset.newDataFrame( + Dataset.ofRows( df.sqlContext, Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 542f2f4deb..853a74c827 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -351,7 +351,7 @@ class SQLContext private[sql]( val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] val attributeSeq = schema.toAttributes val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType)) - Dataset.newDataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self)) + Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRDD)(self)) } /** @@ -366,7 +366,7 @@ class SQLContext private[sql]( SQLContext.setActive(self) val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] val attributeSeq = schema.toAttributes - Dataset.newDataFrame(self, LocalRelation.fromProduct(attributeSeq, data)) + Dataset.ofRows(self, LocalRelation.fromProduct(attributeSeq, data)) } /** @@ -376,7 +376,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = { - Dataset.newDataFrame(this, LogicalRelation(baseRelation)) + Dataset.ofRows(this, LogicalRelation(baseRelation)) } /** @@ -431,7 +431,7 @@ class SQLContext private[sql]( rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)} } val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self) - Dataset.newDataFrame(this, logicalPlan) + Dataset.ofRows(this, logicalPlan) } @@ -466,7 +466,7 @@ class SQLContext private[sql]( // TODO: use MutableProjection when rowRDD is another DataFrame and the applied // schema differs from the existing schema on any field data type. val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self) - Dataset.newDataFrame(this, logicalPlan) + Dataset.ofRows(this, logicalPlan) } /** @@ -494,7 +494,7 @@ class SQLContext private[sql]( */ @DeveloperApi def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = { - Dataset.newDataFrame(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala)) + Dataset.ofRows(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala)) } /** @@ -513,7 +513,7 @@ class SQLContext private[sql]( val localBeanInfo = Introspector.getBeanInfo(Utils.classForName(className)) SQLContext.beansToRows(iter, localBeanInfo, attributeSeq) } - Dataset.newDataFrame(this, LogicalRDD(attributeSeq, rowRdd)(this)) + Dataset.ofRows(this, LogicalRDD(attributeSeq, rowRdd)(this)) } /** @@ -541,7 +541,7 @@ class SQLContext private[sql]( val className = beanClass.getName val beanInfo = Introspector.getBeanInfo(beanClass) val rows = SQLContext.beansToRows(data.asScala.iterator, beanInfo, attrSeq) - Dataset.newDataFrame(self, LocalRelation(attrSeq, rows.toSeq)) + Dataset.ofRows(self, LocalRelation(attrSeq, rows.toSeq)) } /** @@ -759,7 +759,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def sql(sqlText: String): DataFrame = { - Dataset.newDataFrame(this, parseSql(sqlText)) + Dataset.ofRows(this, parseSql(sqlText)) } /** @@ -782,7 +782,7 @@ class SQLContext private[sql]( } private def table(tableIdent: TableIdentifier): DataFrame = { - Dataset.newDataFrame(this, sessionState.catalog.lookupRelation(tableIdent)) + Dataset.ofRows(this, sessionState.catalog.lookupRelation(tableIdent)) } /** @@ -794,7 +794,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def tables(): DataFrame = { - Dataset.newDataFrame(this, ShowTablesCommand(None)) + Dataset.ofRows(this, ShowTablesCommand(None)) } /** @@ -806,7 +806,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def tables(databaseName: String): DataFrame = { - Dataset.newDataFrame(this, ShowTablesCommand(Some(databaseName))) + Dataset.ofRows(this, ShowTablesCommand(Some(databaseName))) } /** @@ -871,7 +871,7 @@ class SQLContext private[sql]( schema: StructType): DataFrame = { val rowRdd = rdd.map(r => python.EvaluatePython.fromJava(r, schema).asInstanceOf[InternalRow]) - Dataset.newDataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self)) + Dataset.ofRows(this, LogicalRDD(schema.toAttributes, rowRdd)(self)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index cd769d0137..59c3ffcf48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -261,7 +261,7 @@ case class CacheTableCommand( override def run(sqlContext: SQLContext): Seq[Row] = { plan.foreach { logicalPlan => - sqlContext.registerDataFrameAsTable(Dataset.newDataFrame(sqlContext, logicalPlan), tableName) + sqlContext.registerDataFrameAsTable(Dataset.ofRows(sqlContext, logicalPlan), tableName) } sqlContext.cacheTable(tableName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index fac2a64726..548da86359 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -154,7 +154,7 @@ case class DataSource( } def dataFrameBuilder(files: Array[String]): DataFrame = { - Dataset.newDataFrame( + Dataset.ofRows( sqlContext, LogicalRelation( DataSource( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala index 9cf794804d..37c2c4517c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala @@ -34,7 +34,7 @@ private[sql] case class InsertIntoDataSource( override def run(sqlContext: SQLContext): Seq[Row] = { val relation = logicalRelation.relation.asInstanceOf[InsertableRelation] - val data = Dataset.newDataFrame(sqlContext, query) + val data = Dataset.ofRows(sqlContext, query) // Apply the schema of the existing table to the new data. val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema) relation.insert(df, overwrite) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index 51ec969daf..a30b52080f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -114,7 +114,7 @@ private[sql] case class InsertIntoHadoopFsRelation( val partitionSet = AttributeSet(partitionColumns) val dataColumns = query.output.filterNot(partitionSet.contains) - val queryExecution = Dataset.newDataFrame(sqlContext, query).queryExecution + val queryExecution = Dataset.ofRows(sqlContext, query).queryExecution SQLExecution.withNewExecutionId(sqlContext, queryExecution) { val relation = WriteRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 7ca0e8859a..9e8e0352db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -101,7 +101,7 @@ case class CreateTempTableUsing( options = options) sqlContext.sessionState.catalog.registerTable( tableIdent, - Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan) + Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan) Seq.empty[Row] } @@ -116,7 +116,7 @@ case class CreateTempTableUsingAsSelect( query: LogicalPlan) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { - val df = Dataset.newDataFrame(sqlContext, query) + val df = Dataset.ofRows(sqlContext, query) val dataSource = DataSource( sqlContext, className = provider, @@ -126,7 +126,7 @@ case class CreateTempTableUsingAsSelect( val result = dataSource.write(mode, df) sqlContext.sessionState.catalog.registerTable( tableIdent, - Dataset.newDataFrame(sqlContext, LogicalRelation(result)).logicalPlan) + Dataset.ofRows(sqlContext, LogicalRelation(result)).logicalPlan) Seq.empty[Row] } @@ -147,7 +147,7 @@ case class RefreshTable(tableIdent: TableIdentifier) if (isCached) { // Create a data frame to represent the table. // TODO: Use uncacheTable once it supports database name. - val df = Dataset.newDataFrame(sqlContext, logicalPlan) + val df = Dataset.ofRows(sqlContext, logicalPlan) // Uncache the logicalPlan. sqlContext.cacheManager.tryUncacheQuery(df, blocking = true) // Cache it again. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala index bccd2a44d9..8c2231335c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala @@ -121,6 +121,6 @@ private[sql] object FrequentItems extends Logging { StructField(v._1 + "_freqItems", ArrayType(v._2, false)) } val schema = StructType(outputCols).toAttributes - Dataset.newDataFrame(df.sqlContext, LocalRelation.fromExternalRows(schema, Seq(resultRow))) + Dataset.ofRows(df.sqlContext, LocalRelation.fromExternalRows(schema, Seq(resultRow))) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 0a0dccbad1..e0b6709c51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -454,6 +454,6 @@ private[sql] object StatFunctions extends Logging { } val schema = StructType(StructField(tableName, StringType) +: headerNames) - Dataset.newDataFrame(df.sqlContext, LocalRelation(schema.toAttributes, table)).na.fill(0.0) + Dataset.ofRows(df.sqlContext, LocalRelation(schema.toAttributes, table)).na.fill(0.0) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index c5fefb5346..29b058f2e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -288,7 +288,7 @@ class StreamExecution( val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000 logDebug(s"Optimized batch in ${optimizerTime}ms") - val nextBatch = Dataset.newDataFrame(sqlContext, newPlan) + val nextBatch = Dataset.ofRows(sqlContext, newPlan) sink.addBatch(currentBatchId - 1, nextBatch) awaitBatchLock.synchronized { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 8c2bb4abd5..8bc8bcaa96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -58,7 +58,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } def toDF()(implicit sqlContext: SQLContext): DataFrame = { - Dataset.newDataFrame(sqlContext, logicalPlan) + Dataset.ofRows(sqlContext, logicalPlan) } def addData(data: A*): Offset = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index dd4aa9e93a..304d747d4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -917,7 +917,7 @@ object functions { * @since 1.5.0 */ def broadcast(df: DataFrame): DataFrame = { - Dataset.newDataFrame(df.sqlContext, BroadcastHint(df.logicalPlan)) + Dataset.ofRows(df.sqlContext, BroadcastHint(df.logicalPlan)) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f60c5ea759..e6b7dc9199 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -956,7 +956,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(e2.getMessage.contains("Inserting into an RDD-based table is not allowed.")) // error case: insert into an OneRowRelation - Dataset.newDataFrame(sqlContext, OneRowRelation).registerTempTable("one_row") + Dataset.ofRows(sqlContext, OneRowRelation).registerTempTable("one_row") val e3 = intercept[AnalysisException] { insertion.write.insertInto("one_row") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index 2dd6416853..4ca739450c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -66,7 +66,7 @@ import org.apache.spark.util.Utils trait StreamTest extends QueryTest with Timeouts { implicit class RichSource(s: Source) { - def toDF(): DataFrame = Dataset.newDataFrame(sqlContext, StreamingRelation(s)) + def toDF(): DataFrame = Dataset.ofRows(sqlContext, StreamingRelation(s)) def toDS[A: Encoder](): Dataset[A] = Dataset(sqlContext, StreamingRelation(s)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 4abc6d6a55..1fa15730bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -268,7 +268,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi l.copy(relation = r.copy(bucketSpec = Some(BucketSpec(numBuckets = buckets, "c1" :: Nil, Nil)))) } - Dataset.newDataFrame(sqlContext, bucketed) + Dataset.ofRows(sqlContext, bucketed) } else { df } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index ab3876728b..d48358566e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -214,7 +214,7 @@ private[sql] trait SQLTestUtils * way to construct [[DataFrame]] directly out of local data without relying on implicits. */ protected implicit def logicalPlanToSparkQuery(plan: LogicalPlan): DataFrame = { - Dataset.newDataFrame(sqlContext, plan) + Dataset.ofRows(sqlContext, plan) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index ff66573620..226b8e1796 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -247,7 +247,7 @@ case class CreateMetastoreDataSourceAsSelect( createMetastoreTable = true } - val data = Dataset.newDataFrame(hiveContext, query) + val data = Dataset.ofRows(hiveContext, query) val df = existingSchema match { // If we are inserting into an existing table, just use the existing schema. case Some(s) => sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, s) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SQLBuilderTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SQLBuilderTest.scala index 047e82e411..9a63ecb4ca 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SQLBuilderTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SQLBuilderTest.scala @@ -63,7 +63,7 @@ abstract class SQLBuilderTest extends QueryTest with TestHiveSingleton { """.stripMargin) } - checkAnswer(sqlContext.sql(generatedSQL), Dataset.newDataFrame(sqlContext, plan)) + checkAnswer(sqlContext.sql(generatedSQL), Dataset.ofRows(sqlContext, plan)) } protected def checkSQL(df: DataFrame, expectedSQL: String): Unit = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 5c26aa1a79..81fd71201d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -968,7 +968,7 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue // Create a new df to make sure its physical operator picks up // spark.sql.TungstenAggregate.testFallbackStartsAt. // todo: remove it? - val newActual = Dataset.newDataFrame(sqlContext, actual.logicalPlan) + val newActual = Dataset.ofRows(sqlContext, actual.logicalPlan) QueryTest.checkAnswer(newActual, expectedAnswer) match { case Some(errorMessage) => -- cgit v1.2.3