From 6d37e1eb90054cdb6323b75fb202f78ece604b15 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 11 Mar 2016 22:17:50 +0800 Subject: [SPARK-13817][BUILD][SQL] Re-enable MiMA and removes object DataFrame ## What changes were proposed in this pull request? PR #11443 temporarily disabled MiMA check, this PR re-enables it. One extra change is that `object DataFrame` is also removed. The only purpose of introducing `object DataFrame` was to use it as an internal factory for creating `Dataset[Row]`. By replacing this internal factory with `Dataset.newDataFrame`, both `DataFrame` and `DataFrame$` are entirely removed from the API, so that we can simply put a `MissingClassProblem` filter in `MimaExcludes.scala` for most DataFrame API changes. ## How was this patch tested? Tested by MiMA check triggered by Jenkins. Author: Cheng Lian Closes #11656 from liancheng/re-enable-mima. --- .../scala/org/apache/spark/sql/DataFrame.scala | 16 ++++++------- .../org/apache/spark/sql/DataFrameReader.scala | 8 +++---- .../scala/org/apache/spark/sql/GroupedData.scala | 8 +++---- .../org/apache/spark/sql/GroupedDataset.scala | 2 +- .../scala/org/apache/spark/sql/SQLContext.scala | 28 +++++++++++----------- .../spark/sql/execution/command/commands.scala | 4 ++-- .../sql/execution/datasources/DataSource.scala | 4 ++-- .../datasources/InsertIntoDataSource.scala | 2 +- .../datasources/InsertIntoHadoopFsRelation.scala | 2 +- .../spark/sql/execution/datasources/ddl.scala | 10 ++++---- .../spark/sql/execution/stat/FrequentItems.scala | 4 ++-- .../spark/sql/execution/stat/StatFunctions.scala | 4 ++-- .../sql/execution/streaming/StreamExecution.scala | 2 +- .../spark/sql/execution/streaming/memory.scala | 2 +- .../scala/org/apache/spark/sql/functions.scala | 2 +- .../org/apache/spark/sql/DataFrameSuite.scala | 2 +- .../scala/org/apache/spark/sql/StreamTest.scala | 2 +- .../org/apache/spark/sql/test/SQLTestUtils.scala | 2 +- .../apache/spark/sql/hive/execution/commands.scala | 2 +- .../org/apache/spark/sql/hive/SQLBuilderTest.scala | 4 ++-- .../sql/hive/execution/AggregationQuerySuite.scala | 2 +- 21 files changed, 55 insertions(+), 57 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 17a91975f4..f1791e6943 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -48,18 +48,16 @@ import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils -private[sql] object DataFrame { - def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = { - val qe = sqlContext.executePlan(logicalPlan) - qe.assertAnalyzed() - new Dataset[Row](sqlContext, logicalPlan, RowEncoder(qe.analyzed.schema)) - } -} - private[sql] object Dataset { def apply[T: Encoder](sqlContext: SQLContext, logicalPlan: LogicalPlan): Dataset[T] = { new Dataset(sqlContext, logicalPlan, implicitly[Encoder[T]]) } + + def newDataFrame(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = { + val qe = sqlContext.executePlan(logicalPlan) + qe.assertAnalyzed() + new Dataset[Row](sqlContext, logicalPlan, RowEncoder(qe.analyzed.schema)) + } } /** @@ -2129,7 +2127,7 @@ class Dataset[T] private[sql]( /** A convenient function to wrap a logical plan and produce a DataFrame. */ @inline private def withPlan(logicalPlan: => LogicalPlan): DataFrame = { - DataFrame(sqlContext, logicalPlan) + Dataset.newDataFrame(sqlContext, logicalPlan) } /** A convenient function to wrap a logical plan and produce a DataFrame. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 822702429d..52b567ea25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -128,7 +128,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { userSpecifiedSchema = userSpecifiedSchema, className = source, options = extraOptions.toMap) - DataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())) + Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())) } /** @@ -175,7 +175,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { userSpecifiedSchema = userSpecifiedSchema, className = source, options = extraOptions.toMap) - DataFrame(sqlContext, StreamingRelation(dataSource.createSource())) + Dataset.newDataFrame(sqlContext, StreamingRelation(dataSource.createSource())) } /** @@ -345,7 +345,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { InferSchema.infer(jsonRDD, sqlContext.conf.columnNameOfCorruptRecord, parsedOptions) } - DataFrame( + Dataset.newDataFrame( sqlContext, LogicalRDD( schema.toAttributes, @@ -393,7 +393,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * @since 1.4.0 */ def table(tableName: String): DataFrame = { - DataFrame(sqlContext, + Dataset.newDataFrame(sqlContext, sqlContext.catalog.lookupRelation(sqlContext.sqlParser.parseTableIdentifier(tableName))) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala index 2a0f77349a..04d277bed2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala @@ -55,17 +55,17 @@ class GroupedData protected[sql]( groupType match { case GroupedData.GroupByType => - DataFrame( + Dataset.newDataFrame( df.sqlContext, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan)) case GroupedData.RollupType => - DataFrame( + Dataset.newDataFrame( df.sqlContext, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan)) case GroupedData.CubeType => - DataFrame( + Dataset.newDataFrame( df.sqlContext, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan)) case GroupedData.PivotType(pivotCol, values) => val aliasedGrps = groupingExprs.map(alias) - DataFrame( + Dataset.newDataFrame( df.sqlContext, Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala index 1639cc8db6..472ae716f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala @@ -64,7 +64,7 @@ class GroupedDataset[K, V] private[sql]( private def groupedData = new GroupedData( - DataFrame(sqlContext, logicalPlan), groupingAttributes, GroupedData.GroupByType) + Dataset.newDataFrame(sqlContext, logicalPlan), groupingAttributes, GroupedData.GroupByType) /** * Returns a new [[GroupedDataset]] where the type of the key has been mapped to the specified diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 54dbd6bda5..49a70a7c5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -374,7 +374,7 @@ class SQLContext private[sql]( val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] val attributeSeq = schema.toAttributes val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType)) - DataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self)) + Dataset.newDataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self)) } /** @@ -389,7 +389,7 @@ class SQLContext private[sql]( SQLContext.setActive(self) val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] val attributeSeq = schema.toAttributes - DataFrame(self, LocalRelation.fromProduct(attributeSeq, data)) + Dataset.newDataFrame(self, LocalRelation.fromProduct(attributeSeq, data)) } /** @@ -399,7 +399,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = { - DataFrame(this, LogicalRelation(baseRelation)) + Dataset.newDataFrame(this, LogicalRelation(baseRelation)) } /** @@ -454,7 +454,7 @@ class SQLContext private[sql]( rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)} } val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self) - DataFrame(this, logicalPlan) + Dataset.newDataFrame(this, logicalPlan) } @@ -489,7 +489,7 @@ class SQLContext private[sql]( // TODO: use MutableProjection when rowRDD is another DataFrame and the applied // schema differs from the existing schema on any field data type. val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self) - DataFrame(this, logicalPlan) + Dataset.newDataFrame(this, logicalPlan) } /** @@ -517,7 +517,7 @@ class SQLContext private[sql]( */ @DeveloperApi def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = { - DataFrame(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala)) + Dataset.newDataFrame(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala)) } /** @@ -536,7 +536,7 @@ class SQLContext private[sql]( val localBeanInfo = Introspector.getBeanInfo(Utils.classForName(className)) SQLContext.beansToRows(iter, localBeanInfo, attributeSeq) } - DataFrame(this, LogicalRDD(attributeSeq, rowRdd)(this)) + Dataset.newDataFrame(this, LogicalRDD(attributeSeq, rowRdd)(this)) } /** @@ -564,7 +564,7 @@ class SQLContext private[sql]( val className = beanClass.getName val beanInfo = Introspector.getBeanInfo(beanClass) val rows = SQLContext.beansToRows(data.asScala.iterator, beanInfo, attrSeq) - DataFrame(self, LocalRelation(attrSeq, rows.toSeq)) + Dataset.newDataFrame(self, LocalRelation(attrSeq, rows.toSeq)) } /** @@ -770,7 +770,7 @@ class SQLContext private[sql]( */ @Experimental def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame = { - DataFrame(this, Range(start, end, step, numPartitions)) + Dataset.newDataFrame(this, Range(start, end, step, numPartitions)) } /** @@ -781,7 +781,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def sql(sqlText: String): DataFrame = { - DataFrame(this, parseSql(sqlText)) + Dataset.newDataFrame(this, parseSql(sqlText)) } /** @@ -795,7 +795,7 @@ class SQLContext private[sql]( } private def table(tableIdent: TableIdentifier): DataFrame = { - DataFrame(this, catalog.lookupRelation(tableIdent)) + Dataset.newDataFrame(this, catalog.lookupRelation(tableIdent)) } /** @@ -807,7 +807,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def tables(): DataFrame = { - DataFrame(this, ShowTablesCommand(None)) + Dataset.newDataFrame(this, ShowTablesCommand(None)) } /** @@ -819,7 +819,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def tables(databaseName: String): DataFrame = { - DataFrame(this, ShowTablesCommand(Some(databaseName))) + Dataset.newDataFrame(this, ShowTablesCommand(Some(databaseName))) } /** @@ -886,7 +886,7 @@ class SQLContext private[sql]( schema: StructType): DataFrame = { val rowRdd = rdd.map(r => python.EvaluatePython.fromJava(r, schema).asInstanceOf[InternalRow]) - DataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self)) + Dataset.newDataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 877456ca48..54cdcb10ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -21,7 +21,7 @@ import java.util.NoSuchElementException import org.apache.spark.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.{Dataset, Row, SQLContext} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} @@ -252,7 +252,7 @@ case class CacheTableCommand( override def run(sqlContext: SQLContext): Seq[Row] = { plan.foreach { logicalPlan => - sqlContext.registerDataFrameAsTable(DataFrame(sqlContext, logicalPlan), tableName) + sqlContext.registerDataFrameAsTable(Dataset.newDataFrame(sqlContext, logicalPlan), tableName) } sqlContext.cacheTable(tableName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 60ec67c8f0..887f5469b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.Logging import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source} import org.apache.spark.sql.sources._ @@ -154,7 +154,7 @@ case class DataSource( } def dataFrameBuilder(files: Array[String]): DataFrame = { - DataFrame( + Dataset.newDataFrame( sqlContext, LogicalRelation( DataSource( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala index abb1628590..9cf794804d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala @@ -34,7 +34,7 @@ private[sql] case class InsertIntoDataSource( override def run(sqlContext: SQLContext): Seq[Row] = { val relation = logicalRelation.relation.asInstanceOf[InsertableRelation] - val data = DataFrame(sqlContext, query) + val data = Dataset.newDataFrame(sqlContext, query) // Apply the schema of the existing table to the new data. val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema) relation.insert(df, overwrite) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index fb52730104..51ec969daf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -114,7 +114,7 @@ private[sql] case class InsertIntoHadoopFsRelation( val partitionSet = AttributeSet(partitionColumns) val dataColumns = query.output.filterNot(partitionSet.contains) - val queryExecution = DataFrame(sqlContext, query).queryExecution + val queryExecution = Dataset.newDataFrame(sqlContext, query).queryExecution SQLExecution.withNewExecutionId(sqlContext, queryExecution) { val relation = WriteRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 895794c4c2..903c9913ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources -import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical @@ -100,7 +100,7 @@ case class CreateTempTableUsing( options = options) sqlContext.catalog.registerTable( tableIdent, - DataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan) + Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan) Seq.empty[Row] } @@ -115,7 +115,7 @@ case class CreateTempTableUsingAsSelect( query: LogicalPlan) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { - val df = DataFrame(sqlContext, query) + val df = Dataset.newDataFrame(sqlContext, query) val dataSource = DataSource( sqlContext, className = provider, @@ -125,7 +125,7 @@ case class CreateTempTableUsingAsSelect( val result = dataSource.write(mode, df) sqlContext.catalog.registerTable( tableIdent, - DataFrame(sqlContext, LogicalRelation(result)).logicalPlan) + Dataset.newDataFrame(sqlContext, LogicalRelation(result)).logicalPlan) Seq.empty[Row] } @@ -146,7 +146,7 @@ case class RefreshTable(tableIdent: TableIdentifier) if (isCached) { // Create a data frame to represent the table. // TODO: Use uncacheTable once it supports database name. - val df = DataFrame(sqlContext, logicalPlan) + val df = Dataset.newDataFrame(sqlContext, logicalPlan) // Uncache the logicalPlan. sqlContext.cacheManager.tryUncacheQuery(df, blocking = true) // Cache it again. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala index 0dc34814fb..b912c5cc1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.stat import scala.collection.mutable.{Map => MutableMap} import org.apache.spark.Logging -import org.apache.spark.sql.{Column, DataFrame, Row} +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.types._ @@ -121,6 +121,6 @@ private[sql] object FrequentItems extends Logging { StructField(v._1 + "_freqItems", ArrayType(v._2, false)) } val schema = StructType(outputCols).toAttributes - DataFrame(df.sqlContext, LocalRelation.fromExternalRows(schema, Seq(resultRow))) + Dataset.newDataFrame(df.sqlContext, LocalRelation.fromExternalRows(schema, Seq(resultRow))) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index daa065e5cd..71fd185b16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.stat import scala.collection.mutable.ArrayBuffer import org.apache.spark.Logging -import org.apache.spark.sql.{Column, DataFrame, Row} +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} import org.apache.spark.sql.catalyst.expressions.{Cast, GenericMutableRow} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.functions._ @@ -454,6 +454,6 @@ private[sql] object StatFunctions extends Logging { } val schema = StructType(StructField(tableName, StringType) +: headerNames) - DataFrame(df.sqlContext, LocalRelation(schema.toAttributes, table)).na.fill(0.0) + Dataset.newDataFrame(df.sqlContext, LocalRelation(schema.toAttributes, table)).na.fill(0.0) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 7d7c51b158..4a0eb46b22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -211,7 +211,7 @@ class StreamExecution( // Construct the batch and send it to the sink. val batchOffset = streamProgress.toCompositeOffset(sources) - val nextBatch = new Batch(batchOffset, DataFrame(sqlContext, newPlan)) + val nextBatch = new Batch(batchOffset, Dataset.newDataFrame(sqlContext, newPlan)) sink.addBatch(nextBatch) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 3b764c5558..096477ce0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -59,7 +59,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } def toDF()(implicit sqlContext: SQLContext): DataFrame = { - DataFrame(sqlContext, logicalPlan) + Dataset.newDataFrame(sqlContext, logicalPlan) } def addData(data: A*): Offset = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 86412c3489..737e125f6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -936,7 +936,7 @@ object functions extends LegacyFunctions { * @since 1.5.0 */ def broadcast(df: DataFrame): DataFrame = { - DataFrame(df.sqlContext, BroadcastHint(df.logicalPlan)) + Dataset.newDataFrame(df.sqlContext, BroadcastHint(df.logicalPlan)) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f4a5107eaf..46cd380a79 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -933,7 +933,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(e2.getMessage.contains("Inserting into an RDD-based table is not allowed.")) // error case: insert into an OneRowRelation - DataFrame(sqlContext, OneRowRelation).registerTempTable("one_row") + Dataset.newDataFrame(sqlContext, OneRowRelation).registerTempTable("one_row") val e3 = intercept[AnalysisException] { insertion.write.insertInto("one_row") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index 493a5a6437..7a5b639115 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -65,7 +65,7 @@ import org.apache.spark.sql.execution.streaming._ trait StreamTest extends QueryTest with Timeouts { implicit class RichSource(s: Source) { - def toDF(): DataFrame = DataFrame(sqlContext, StreamingRelation(s)) + def toDF(): DataFrame = Dataset.newDataFrame(sqlContext, StreamingRelation(s)) def toDS[A: Encoder](): Dataset[A] = Dataset(sqlContext, StreamingRelation(s)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index a7592e5d8d..2bce74571d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -219,7 +219,7 @@ private[sql] trait SQLTestUtils * way to construct [[DataFrame]] directly out of local data without relying on implicits. */ protected implicit def logicalPlanToSparkQuery(plan: LogicalPlan): DataFrame = { - DataFrame(sqlContext, plan) + Dataset.newDataFrame(sqlContext, plan) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 7e4fb8b0ac..c4723fcb82 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -246,7 +246,7 @@ case class CreateMetastoreDataSourceAsSelect( createMetastoreTable = true } - val data = DataFrame(hiveContext, query) + val data = Dataset.newDataFrame(hiveContext, query) val df = existingSchema match { // If we are inserting into an existing table, just use the existing schema. case Some(s) => sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, s) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SQLBuilderTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SQLBuilderTest.scala index a0a0d134da..047e82e411 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SQLBuilderTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SQLBuilderTest.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive import scala.util.control.NonFatal -import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.{DataFrame, Dataset, QueryTest} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -63,7 +63,7 @@ abstract class SQLBuilderTest extends QueryTest with TestHiveSingleton { """.stripMargin) } - checkAnswer(sqlContext.sql(generatedSQL), DataFrame(sqlContext, plan)) + checkAnswer(sqlContext.sql(generatedSQL), Dataset.newDataFrame(sqlContext, plan)) } protected def checkSQL(df: DataFrame, expectedSQL: String): Unit = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index d5a4295d62..5c26aa1a79 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -968,7 +968,7 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue // Create a new df to make sure its physical operator picks up // spark.sql.TungstenAggregate.testFallbackStartsAt. // todo: remove it? - val newActual = DataFrame(sqlContext, actual.logicalPlan) + val newActual = Dataset.newDataFrame(sqlContext, actual.logicalPlan) QueryTest.checkAnswer(newActual, expectedAnswer) match { case Some(errorMessage) => -- cgit v1.2.3