diff options
Diffstat (limited to 'sql/core/src')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 27 |
1 files changed, 18 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 969fcdf428..ac2ca3c5a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -134,15 +134,24 @@ class Dataset[T] private[sql]( this(sqlContext, sqlContext.executePlan(logicalPlan), encoder) } - @transient protected[sql] val logicalPlan: LogicalPlan = queryExecution.logical match { - // For various commands (like DDL) and queries with side effects, we force query optimization to - // happen right away to let these side effects take place eagerly. - case _: Command | - _: InsertIntoTable | - _: CreateTableUsingAsSelect => - LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext) - case _ => - queryExecution.analyzed + @transient protected[sql] val logicalPlan: LogicalPlan = { + def hasSideEffects(plan: LogicalPlan): Boolean = plan match { + case _: Command | + _: InsertIntoTable | + _: CreateTableUsingAsSelect => true + case _ => false + } + + queryExecution.logical match { + // For various commands (like DDL) and queries with side effects, we force query execution + // to happen right away to let these side effects take place eagerly. + case p if hasSideEffects(p) => + LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext) + case Union(children) if children.forall(hasSideEffects) => + LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext) + case _ => + queryExecution.analyzed + } } /** |