aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-03-11 22:17:50 +0800
committerCheng Lian <lian@databricks.com>2016-03-11 22:17:50 +0800
commit6d37e1eb90054cdb6323b75fb202f78ece604b15 (patch)
tree1a93192d453c0ad68929b38fd1346af82314131b /sql/core/src/main/scala/org
parent07f1c5447753a3d593cd6ececfcb03c11b1cf8ff (diff)
downloadspark-6d37e1eb90054cdb6323b75fb202f78ece604b15.tar.gz
spark-6d37e1eb90054cdb6323b75fb202f78ece604b15.tar.bz2
spark-6d37e1eb90054cdb6323b75fb202f78ece604b15.zip
[SPARK-13817][BUILD][SQL] Re-enable MiMA and removes object DataFrame
## What changes were proposed in this pull request? PR #11443 temporarily disabled MiMA check, this PR re-enables it. One extra change is that `object DataFrame` is also removed. The only purpose of introducing `object DataFrame` was to use it as an internal factory for creating `Dataset[Row]`. By replacing this internal factory with `Dataset.newDataFrame`, both `DataFrame` and `DataFrame$` are entirely removed from the API, so that we can simply put a `MissingClassProblem` filter in `MimaExcludes.scala` for most DataFrame API changes. ## How was this patch tested? Tested by MiMA check triggered by Jenkins. Author: Cheng Lian <lian@databricks.com> Closes #11656 from liancheng/re-enable-mima.
Diffstat (limited to 'sql/core/src/main/scala/org')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala2
15 files changed, 48 insertions, 50 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 17a91975f4..f1791e6943 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -48,18 +48,16 @@ import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
-private[sql] object DataFrame {
- def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
- val qe = sqlContext.executePlan(logicalPlan)
- qe.assertAnalyzed()
- new Dataset[Row](sqlContext, logicalPlan, RowEncoder(qe.analyzed.schema))
- }
-}
-
private[sql] object Dataset {
def apply[T: Encoder](sqlContext: SQLContext, logicalPlan: LogicalPlan): Dataset[T] = {
new Dataset(sqlContext, logicalPlan, implicitly[Encoder[T]])
}
+
+ def newDataFrame(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
+ val qe = sqlContext.executePlan(logicalPlan)
+ qe.assertAnalyzed()
+ new Dataset[Row](sqlContext, logicalPlan, RowEncoder(qe.analyzed.schema))
+ }
}
/**
@@ -2129,7 +2127,7 @@ class Dataset[T] private[sql](
/** A convenient function to wrap a logical plan and produce a DataFrame. */
@inline private def withPlan(logicalPlan: => LogicalPlan): DataFrame = {
- DataFrame(sqlContext, logicalPlan)
+ Dataset.newDataFrame(sqlContext, logicalPlan)
}
/** A convenient function to wrap a logical plan and produce a DataFrame. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 822702429d..52b567ea25 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -128,7 +128,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap)
- DataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation()))
+ Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation()))
}
/**
@@ -175,7 +175,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap)
- DataFrame(sqlContext, StreamingRelation(dataSource.createSource()))
+ Dataset.newDataFrame(sqlContext, StreamingRelation(dataSource.createSource()))
}
/**
@@ -345,7 +345,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
InferSchema.infer(jsonRDD, sqlContext.conf.columnNameOfCorruptRecord, parsedOptions)
}
- DataFrame(
+ Dataset.newDataFrame(
sqlContext,
LogicalRDD(
schema.toAttributes,
@@ -393,7 +393,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* @since 1.4.0
*/
def table(tableName: String): DataFrame = {
- DataFrame(sqlContext,
+ Dataset.newDataFrame(sqlContext,
sqlContext.catalog.lookupRelation(sqlContext.sqlParser.parseTableIdentifier(tableName)))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
index 2a0f77349a..04d277bed2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@@ -55,17 +55,17 @@ class GroupedData protected[sql](
groupType match {
case GroupedData.GroupByType =>
- DataFrame(
+ Dataset.newDataFrame(
df.sqlContext, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))
case GroupedData.RollupType =>
- DataFrame(
+ Dataset.newDataFrame(
df.sqlContext, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan))
case GroupedData.CubeType =>
- DataFrame(
+ Dataset.newDataFrame(
df.sqlContext, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan))
case GroupedData.PivotType(pivotCol, values) =>
val aliasedGrps = groupingExprs.map(alias)
- DataFrame(
+ Dataset.newDataFrame(
df.sqlContext, Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
index 1639cc8db6..472ae716f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
@@ -64,7 +64,7 @@ class GroupedDataset[K, V] private[sql](
private def groupedData =
new GroupedData(
- DataFrame(sqlContext, logicalPlan), groupingAttributes, GroupedData.GroupByType)
+ Dataset.newDataFrame(sqlContext, logicalPlan), groupingAttributes, GroupedData.GroupByType)
/**
* Returns a new [[GroupedDataset]] where the type of the key has been mapped to the specified
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 54dbd6bda5..49a70a7c5f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -374,7 +374,7 @@ class SQLContext private[sql](
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
val attributeSeq = schema.toAttributes
val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType))
- DataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self))
+ Dataset.newDataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self))
}
/**
@@ -389,7 +389,7 @@ class SQLContext private[sql](
SQLContext.setActive(self)
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
val attributeSeq = schema.toAttributes
- DataFrame(self, LocalRelation.fromProduct(attributeSeq, data))
+ Dataset.newDataFrame(self, LocalRelation.fromProduct(attributeSeq, data))
}
/**
@@ -399,7 +399,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
- DataFrame(this, LogicalRelation(baseRelation))
+ Dataset.newDataFrame(this, LogicalRelation(baseRelation))
}
/**
@@ -454,7 +454,7 @@ class SQLContext private[sql](
rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)}
}
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
- DataFrame(this, logicalPlan)
+ Dataset.newDataFrame(this, logicalPlan)
}
@@ -489,7 +489,7 @@ class SQLContext private[sql](
// TODO: use MutableProjection when rowRDD is another DataFrame and the applied
// schema differs from the existing schema on any field data type.
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
- DataFrame(this, logicalPlan)
+ Dataset.newDataFrame(this, logicalPlan)
}
/**
@@ -517,7 +517,7 @@ class SQLContext private[sql](
*/
@DeveloperApi
def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = {
- DataFrame(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala))
+ Dataset.newDataFrame(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala))
}
/**
@@ -536,7 +536,7 @@ class SQLContext private[sql](
val localBeanInfo = Introspector.getBeanInfo(Utils.classForName(className))
SQLContext.beansToRows(iter, localBeanInfo, attributeSeq)
}
- DataFrame(this, LogicalRDD(attributeSeq, rowRdd)(this))
+ Dataset.newDataFrame(this, LogicalRDD(attributeSeq, rowRdd)(this))
}
/**
@@ -564,7 +564,7 @@ class SQLContext private[sql](
val className = beanClass.getName
val beanInfo = Introspector.getBeanInfo(beanClass)
val rows = SQLContext.beansToRows(data.asScala.iterator, beanInfo, attrSeq)
- DataFrame(self, LocalRelation(attrSeq, rows.toSeq))
+ Dataset.newDataFrame(self, LocalRelation(attrSeq, rows.toSeq))
}
/**
@@ -770,7 +770,7 @@ class SQLContext private[sql](
*/
@Experimental
def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame = {
- DataFrame(this, Range(start, end, step, numPartitions))
+ Dataset.newDataFrame(this, Range(start, end, step, numPartitions))
}
/**
@@ -781,7 +781,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def sql(sqlText: String): DataFrame = {
- DataFrame(this, parseSql(sqlText))
+ Dataset.newDataFrame(this, parseSql(sqlText))
}
/**
@@ -795,7 +795,7 @@ class SQLContext private[sql](
}
private def table(tableIdent: TableIdentifier): DataFrame = {
- DataFrame(this, catalog.lookupRelation(tableIdent))
+ Dataset.newDataFrame(this, catalog.lookupRelation(tableIdent))
}
/**
@@ -807,7 +807,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tables(): DataFrame = {
- DataFrame(this, ShowTablesCommand(None))
+ Dataset.newDataFrame(this, ShowTablesCommand(None))
}
/**
@@ -819,7 +819,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tables(databaseName: String): DataFrame = {
- DataFrame(this, ShowTablesCommand(Some(databaseName)))
+ Dataset.newDataFrame(this, ShowTablesCommand(Some(databaseName)))
}
/**
@@ -886,7 +886,7 @@ class SQLContext private[sql](
schema: StructType): DataFrame = {
val rowRdd = rdd.map(r => python.EvaluatePython.fromJava(r, schema).asInstanceOf[InternalRow])
- DataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
+ Dataset.newDataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 877456ca48..54cdcb10ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -21,7 +21,7 @@ import java.util.NoSuchElementException
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{Dataset, Row, SQLContext}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
@@ -252,7 +252,7 @@ case class CacheTableCommand(
override def run(sqlContext: SQLContext): Seq[Row] = {
plan.foreach { logicalPlan =>
- sqlContext.registerDataFrameAsTable(DataFrame(sqlContext, logicalPlan), tableName)
+ sqlContext.registerDataFrameAsTable(Dataset.newDataFrame(sqlContext, logicalPlan), tableName)
}
sqlContext.cacheTable(tableName)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 60ec67c8f0..887f5469b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source}
import org.apache.spark.sql.sources._
@@ -154,7 +154,7 @@ case class DataSource(
}
def dataFrameBuilder(files: Array[String]): DataFrame = {
- DataFrame(
+ Dataset.newDataFrame(
sqlContext,
LogicalRelation(
DataSource(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
index abb1628590..9cf794804d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
@@ -34,7 +34,7 @@ private[sql] case class InsertIntoDataSource(
override def run(sqlContext: SQLContext): Seq[Row] = {
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
- val data = DataFrame(sqlContext, query)
+ val data = Dataset.newDataFrame(sqlContext, query)
// Apply the schema of the existing table to the new data.
val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
relation.insert(df, overwrite)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index fb52730104..51ec969daf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -114,7 +114,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
val partitionSet = AttributeSet(partitionColumns)
val dataColumns = query.output.filterNot(partitionSet.contains)
- val queryExecution = DataFrame(sqlContext, query).queryExecution
+ val queryExecution = Dataset.newDataFrame(sqlContext, query).queryExecution
SQLExecution.withNewExecutionId(sqlContext, queryExecution) {
val relation =
WriteRelation(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 895794c4c2..903c9913ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.datasources
-import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext}
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical
@@ -100,7 +100,7 @@ case class CreateTempTableUsing(
options = options)
sqlContext.catalog.registerTable(
tableIdent,
- DataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan)
+ Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan)
Seq.empty[Row]
}
@@ -115,7 +115,7 @@ case class CreateTempTableUsingAsSelect(
query: LogicalPlan) extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
- val df = DataFrame(sqlContext, query)
+ val df = Dataset.newDataFrame(sqlContext, query)
val dataSource = DataSource(
sqlContext,
className = provider,
@@ -125,7 +125,7 @@ case class CreateTempTableUsingAsSelect(
val result = dataSource.write(mode, df)
sqlContext.catalog.registerTable(
tableIdent,
- DataFrame(sqlContext, LogicalRelation(result)).logicalPlan)
+ Dataset.newDataFrame(sqlContext, LogicalRelation(result)).logicalPlan)
Seq.empty[Row]
}
@@ -146,7 +146,7 @@ case class RefreshTable(tableIdent: TableIdentifier)
if (isCached) {
// Create a data frame to represent the table.
// TODO: Use uncacheTable once it supports database name.
- val df = DataFrame(sqlContext, logicalPlan)
+ val df = Dataset.newDataFrame(sqlContext, logicalPlan)
// Uncache the logicalPlan.
sqlContext.cacheManager.tryUncacheQuery(df, blocking = true)
// Cache it again.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
index 0dc34814fb..b912c5cc1d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.stat
import scala.collection.mutable.{Map => MutableMap}
import org.apache.spark.Logging
-import org.apache.spark.sql.{Column, DataFrame, Row}
+import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.types._
@@ -121,6 +121,6 @@ private[sql] object FrequentItems extends Logging {
StructField(v._1 + "_freqItems", ArrayType(v._2, false))
}
val schema = StructType(outputCols).toAttributes
- DataFrame(df.sqlContext, LocalRelation.fromExternalRows(schema, Seq(resultRow)))
+ Dataset.newDataFrame(df.sqlContext, LocalRelation.fromExternalRows(schema, Seq(resultRow)))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
index daa065e5cd..71fd185b16 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.stat
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.Logging
-import org.apache.spark.sql.{Column, DataFrame, Row}
+import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
import org.apache.spark.sql.catalyst.expressions.{Cast, GenericMutableRow}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.functions._
@@ -454,6 +454,6 @@ private[sql] object StatFunctions extends Logging {
}
val schema = StructType(StructField(tableName, StringType) +: headerNames)
- DataFrame(df.sqlContext, LocalRelation(schema.toAttributes, table)).na.fill(0.0)
+ Dataset.newDataFrame(df.sqlContext, LocalRelation(schema.toAttributes, table)).na.fill(0.0)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 7d7c51b158..4a0eb46b22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -211,7 +211,7 @@ class StreamExecution(
// Construct the batch and send it to the sink.
val batchOffset = streamProgress.toCompositeOffset(sources)
- val nextBatch = new Batch(batchOffset, DataFrame(sqlContext, newPlan))
+ val nextBatch = new Batch(batchOffset, Dataset.newDataFrame(sqlContext, newPlan))
sink.addBatch(nextBatch)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 3b764c5558..096477ce0e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -59,7 +59,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}
def toDF()(implicit sqlContext: SQLContext): DataFrame = {
- DataFrame(sqlContext, logicalPlan)
+ Dataset.newDataFrame(sqlContext, logicalPlan)
}
def addData(data: A*): Offset = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 86412c3489..737e125f6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -936,7 +936,7 @@ object functions extends LegacyFunctions {
* @since 1.5.0
*/
def broadcast(df: DataFrame): DataFrame = {
- DataFrame(df.sqlContext, BroadcastHint(df.logicalPlan))
+ Dataset.newDataFrame(df.sqlContext, BroadcastHint(df.logicalPlan))
}
/**