aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala27
1 files changed, 18 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 969fcdf428..ac2ca3c5a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -134,15 +134,24 @@ class Dataset[T] private[sql](
this(sqlContext, sqlContext.executePlan(logicalPlan), encoder)
}
- @transient protected[sql] val logicalPlan: LogicalPlan = queryExecution.logical match {
- // For various commands (like DDL) and queries with side effects, we force query optimization to
- // happen right away to let these side effects take place eagerly.
- case _: Command |
- _: InsertIntoTable |
- _: CreateTableUsingAsSelect =>
- LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
- case _ =>
- queryExecution.analyzed
+ @transient protected[sql] val logicalPlan: LogicalPlan = {
+ def hasSideEffects(plan: LogicalPlan): Boolean = plan match {
+ case _: Command |
+ _: InsertIntoTable |
+ _: CreateTableUsingAsSelect => true
+ case _ => false
+ }
+
+ queryExecution.logical match {
+ // For various commands (like DDL) and queries with side effects, we force query execution
+ // to happen right away to let these side effects take place eagerly.
+ case p if hasSideEffects(p) =>
+ LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
+ case Union(children) if children.forall(hasSideEffects) =>
+ LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
+ case _ =>
+ queryExecution.analyzed
+ }
}
/**