aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-11-05 11:58:13 -0800
committerYin Huai <yhuai@databricks.com>2015-11-05 11:58:13 -0800
commit6b87acd6649a3390b5c2c4fcb61e58d125d0d87c (patch)
treeed4a922c00ff6cc68555a2cc4f824c2a5f647eb7 /sql
parentc76865c6220e3e7b2a266bbc4935567ef55303d8 (diff)
downloadspark-6b87acd6649a3390b5c2c4fcb61e58d125d0d87c.tar.gz
spark-6b87acd6649a3390b5c2c4fcb61e58d125d0d87c.tar.bz2
spark-6b87acd6649a3390b5c2c4fcb61e58d125d0d87c.zip
[SPARK-11513][SQL] Remove implicit conversion from LogicalPlan to DataFrame
This internal implicit conversion has been a source of confusion for a lot of new developers. Author: Reynold Xin <rxin@databricks.com> Closes #9479 from rxin/SPARK-11513.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala123
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala5
2 files changed, 78 insertions, 50 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index d3a2249d70..6336dee7be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -147,14 +147,6 @@ class DataFrame private[sql](
queryExecution.analyzed
}
- /**
- * An implicit conversion function internal to this class for us to avoid doing
- * "new DataFrame(...)" everywhere.
- */
- @inline private implicit def logicalPlanToDataFrame(logicalPlan: LogicalPlan): DataFrame = {
- new DataFrame(sqlContext, logicalPlan)
- }
-
protected[sql] def resolve(colName: String): NamedExpression = {
queryExecution.analyzed.resolveQuoted(colName, sqlContext.analyzer.resolver).getOrElse {
throw new AnalysisException(
@@ -235,7 +227,7 @@ class DataFrame private[sql](
// For Data that has more than "numRows" records
if (hasMoreData) {
val rowsString = if (numRows == 1) "row" else "rows"
- sb.append(s"only showing top $numRows ${rowsString}\n")
+ sb.append(s"only showing top $numRows $rowsString\n")
}
sb.toString()
@@ -332,7 +324,7 @@ class DataFrame private[sql](
*/
def explain(extended: Boolean): Unit = {
val explain = ExplainCommand(queryExecution.logical, extended = extended)
- explain.queryExecution.executedPlan.executeCollect().foreach {
+ withPlan(explain).queryExecution.executedPlan.executeCollect().foreach {
// scalastyle:off println
r => println(r.getString(0))
// scalastyle:on println
@@ -370,7 +362,7 @@ class DataFrame private[sql](
* @group action
* @since 1.3.0
*/
- def show(numRows: Int): Unit = show(numRows, true)
+ def show(numRows: Int): Unit = show(numRows, truncate = true)
/**
* Displays the top 20 rows of [[DataFrame]] in a tabular form. Strings more than 20 characters
@@ -445,7 +437,7 @@ class DataFrame private[sql](
* @group dfops
* @since 1.3.0
*/
- def join(right: DataFrame): DataFrame = {
+ def join(right: DataFrame): DataFrame = withPlan {
Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
}
@@ -520,21 +512,25 @@ class DataFrame private[sql](
Join(logicalPlan, right.logicalPlan, joinType = Inner, None)).analyzed.asInstanceOf[Join]
// Project only one of the join columns.
- val joinedCols = usingColumns.map(col => joined.right.resolve(col))
+ val joinedCols = usingColumns.map(col => withPlan(joined.right).resolve(col))
val condition = usingColumns.map { col =>
- catalyst.expressions.EqualTo(joined.left.resolve(col), joined.right.resolve(col))
+ catalyst.expressions.EqualTo(
+ withPlan(joined.left).resolve(col),
+ withPlan(joined.right).resolve(col))
}.reduceLeftOption[catalyst.expressions.BinaryExpression] { (cond, eqTo) =>
catalyst.expressions.And(cond, eqTo)
}
- Project(
- joined.output.filterNot(joinedCols.contains(_)),
- Join(
- joined.left,
- joined.right,
- joinType = JoinType(joinType),
- condition)
- )
+ withPlan {
+ Project(
+ joined.output.filterNot(joinedCols.contains(_)),
+ Join(
+ joined.left,
+ joined.right,
+ joinType = JoinType(joinType),
+ condition)
+ )
+ }
}
/**
@@ -581,19 +577,20 @@ class DataFrame private[sql](
// Trigger analysis so in the case of self-join, the analyzer will clone the plan.
// After the cloning, left and right side will have distinct expression ids.
- val plan = Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr))
+ val plan = withPlan(
+ Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr)))
.queryExecution.analyzed.asInstanceOf[Join]
// If auto self join alias is disabled, return the plan.
if (!sqlContext.conf.dataFrameSelfJoinAutoResolveAmbiguity) {
- return plan
+ return withPlan(plan)
}
// If left/right have no output set intersection, return the plan.
- val lanalyzed = this.logicalPlan.queryExecution.analyzed
- val ranalyzed = right.logicalPlan.queryExecution.analyzed
+ val lanalyzed = withPlan(this.logicalPlan).queryExecution.analyzed
+ val ranalyzed = withPlan(right.logicalPlan).queryExecution.analyzed
if (lanalyzed.outputSet.intersect(ranalyzed.outputSet).isEmpty) {
- return plan
+ return withPlan(plan)
}
// Otherwise, find the trivially true predicates and automatically resolves them to both sides.
@@ -602,9 +599,14 @@ class DataFrame private[sql](
val cond = plan.condition.map { _.transform {
case catalyst.expressions.EqualTo(a: AttributeReference, b: AttributeReference)
if a.sameRef(b) =>
- catalyst.expressions.EqualTo(plan.left.resolve(a.name), plan.right.resolve(b.name))
+ catalyst.expressions.EqualTo(
+ withPlan(plan.left).resolve(a.name),
+ withPlan(plan.right).resolve(b.name))
}}
- plan.copy(condition = cond)
+
+ withPlan {
+ plan.copy(condition = cond)
+ }
}
/**
@@ -707,7 +709,9 @@ class DataFrame private[sql](
* @group dfops
* @since 1.3.0
*/
- def as(alias: String): DataFrame = Subquery(alias, logicalPlan)
+ def as(alias: String): DataFrame = withPlan {
+ Subquery(alias, logicalPlan)
+ }
/**
* (Scala-specific) Returns a new [[DataFrame]] with an alias set.
@@ -739,7 +743,7 @@ class DataFrame private[sql](
* @since 1.3.0
*/
@scala.annotation.varargs
- def select(cols: Column*): DataFrame = {
+ def select(cols: Column*): DataFrame = withPlan {
val namedExpressions = cols.map {
// Wrap UnresolvedAttribute with UnresolvedAlias, as when we resolve UnresolvedAttribute, we
// will remove intermediate Alias for ExtractValue chain, and we need to alias it again to
@@ -798,7 +802,9 @@ class DataFrame private[sql](
* @group dfops
* @since 1.3.0
*/
- def filter(condition: Column): DataFrame = Filter(condition.expr, logicalPlan)
+ def filter(condition: Column): DataFrame = withPlan {
+ Filter(condition.expr, logicalPlan)
+ }
/**
* Filters rows using the given SQL expression.
@@ -1039,7 +1045,9 @@ class DataFrame private[sql](
* @group dfops
* @since 1.3.0
*/
- def limit(n: Int): DataFrame = Limit(Literal(n), logicalPlan)
+ def limit(n: Int): DataFrame = withPlan {
+ Limit(Literal(n), logicalPlan)
+ }
/**
* Returns a new [[DataFrame]] containing union of rows in this frame and another frame.
@@ -1047,7 +1055,9 @@ class DataFrame private[sql](
* @group dfops
* @since 1.3.0
*/
- def unionAll(other: DataFrame): DataFrame = Union(logicalPlan, other.logicalPlan)
+ def unionAll(other: DataFrame): DataFrame = withPlan {
+ Union(logicalPlan, other.logicalPlan)
+ }
/**
* Returns a new [[DataFrame]] containing rows only in both this frame and another frame.
@@ -1055,7 +1065,9 @@ class DataFrame private[sql](
* @group dfops
* @since 1.3.0
*/
- def intersect(other: DataFrame): DataFrame = Intersect(logicalPlan, other.logicalPlan)
+ def intersect(other: DataFrame): DataFrame = withPlan {
+ Intersect(logicalPlan, other.logicalPlan)
+ }
/**
* Returns a new [[DataFrame]] containing rows in this frame but not in another frame.
@@ -1063,7 +1075,9 @@ class DataFrame private[sql](
* @group dfops
* @since 1.3.0
*/
- def except(other: DataFrame): DataFrame = Except(logicalPlan, other.logicalPlan)
+ def except(other: DataFrame): DataFrame = withPlan {
+ Except(logicalPlan, other.logicalPlan)
+ }
/**
* Returns a new [[DataFrame]] by sampling a fraction of rows.
@@ -1074,7 +1088,7 @@ class DataFrame private[sql](
* @group dfops
* @since 1.3.0
*/
- def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataFrame = {
+ def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataFrame = withPlan {
Sample(0.0, fraction, withReplacement, seed, logicalPlan)
}
@@ -1102,7 +1116,7 @@ class DataFrame private[sql](
val sum = weights.sum
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
normalizedCumWeights.sliding(2).map { x =>
- new DataFrame(sqlContext, Sample(x(0), x(1), false, seed, logicalPlan))
+ new DataFrame(sqlContext, Sample(x(0), x(1), withReplacement = false, seed, logicalPlan))
}.toArray
}
@@ -1162,8 +1176,10 @@ class DataFrame private[sql](
f.andThen(_.map(convert(_).asInstanceOf[InternalRow]))
val generator = UserDefinedGenerator(elementTypes, rowFunction, input.map(_.expr))
- Generate(generator, join = true, outer = false,
- qualifier = None, generatorOutput = Nil, logicalPlan)
+ withPlan {
+ Generate(generator, join = true, outer = false,
+ qualifier = None, generatorOutput = Nil, logicalPlan)
+ }
}
/**
@@ -1190,8 +1206,10 @@ class DataFrame private[sql](
}
val generator = UserDefinedGenerator(elementTypes, rowFunction, apply(inputColumn).expr :: Nil)
- Generate(generator, join = true, outer = false,
- qualifier = None, generatorOutput = Nil, logicalPlan)
+ withPlan {
+ Generate(generator, join = true, outer = false,
+ qualifier = None, generatorOutput = Nil, logicalPlan)
+ }
}
/////////////////////////////////////////////////////////////////////////////
@@ -1309,7 +1327,7 @@ class DataFrame private[sql](
* @group dfops
* @since 1.4.0
*/
- def dropDuplicates(colNames: Seq[String]): DataFrame = {
+ def dropDuplicates(colNames: Seq[String]): DataFrame = withPlan {
val groupCols = colNames.map(resolve)
val groupColExprIds = groupCols.map(_.exprId)
val aggCols = logicalPlan.output.map { attr =>
@@ -1355,7 +1373,7 @@ class DataFrame private[sql](
* @since 1.3.1
*/
@scala.annotation.varargs
- def describe(cols: String*): DataFrame = {
+ def describe(cols: String*): DataFrame = withPlan {
// The list of summary statistics to compute, in the form of expressions.
val statistics = List[(String, Expression => Expression)](
@@ -1505,7 +1523,7 @@ class DataFrame private[sql](
* @group dfops
* @since 1.3.0
*/
- def repartition(numPartitions: Int): DataFrame = {
+ def repartition(numPartitions: Int): DataFrame = withPlan {
Repartition(numPartitions, shuffle = true, logicalPlan)
}
@@ -1519,7 +1537,7 @@ class DataFrame private[sql](
* @since 1.6.0
*/
@scala.annotation.varargs
- def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame = {
+ def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame = withPlan {
RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, Some(numPartitions))
}
@@ -1533,7 +1551,7 @@ class DataFrame private[sql](
* @since 1.6.0
*/
@scala.annotation.varargs
- def repartition(partitionExprs: Column*): DataFrame = {
+ def repartition(partitionExprs: Column*): DataFrame = withPlan {
RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions = None)
}
@@ -1545,7 +1563,7 @@ class DataFrame private[sql](
* @group rdd
* @since 1.4.0
*/
- def coalesce(numPartitions: Int): DataFrame = {
+ def coalesce(numPartitions: Int): DataFrame = withPlan {
Repartition(numPartitions, shuffle = false, logicalPlan)
}
@@ -2066,7 +2084,14 @@ class DataFrame private[sql](
SortOrder(expr, Ascending)
}
}
- Sort(sortOrder, global = global, logicalPlan)
+ withPlan {
+ Sort(sortOrder, global = global, logicalPlan)
+ }
+ }
+
+ /** A convenient function to wrap a logical plan and produce a DataFrame. */
+ @inline private def withPlan(logicalPlan: => LogicalPlan): DataFrame = {
+ new DataFrame(sqlContext, logicalPlan)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 7b75aeec4c..500227e93a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -107,13 +107,16 @@ class Dataset[T] private(
* strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]]
* objects that allow fields to be accessed by ordinal or name.
*/
+ // This is declared with parentheses to prevent the Scala compiler from treating
+ // `ds.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
def toDF(): DataFrame = DataFrame(sqlContext, logicalPlan)
-
/**
* Returns this Dataset.
* @since 1.6.0
*/
+ // This is declared with parentheses to prevent the Scala compiler from treating
+ // `ds.toDS("1")` as invoking this toDF and then apply on the returned Dataset.
def toDS(): Dataset[T] = this
/**