aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-06-07 14:20:33 -0700
committerReynold Xin <rxin@apache.org>2014-06-07 14:20:33 -0700
commita6c72ab16e7a3027739ab419819f5222e270838e (patch)
treee7a6f7f43b3ab0a4c0bcca38fb662fd7880644f6 /sql/core
parent41c4a33105c74417192925db355019ba1badeab2 (diff)
downloadspark-a6c72ab16e7a3027739ab419819f5222e270838e.tar.gz
spark-a6c72ab16e7a3027739ab419819f5222e270838e.tar.bz2
spark-a6c72ab16e7a3027739ab419819f5222e270838e.zip
[SPARK-1994][SQL] Weird data corruption bug when running Spark SQL on data in HDFS
Basically there is a race condition (possibly a scala bug?) when these values are recomputed on all of the slaves that results in an incorrect projection being generated (possibly because the GUID uniqueness contract is broken?). In general we should probably enforce that all expression planing occurs on the driver, as is now occurring here. Author: Michael Armbrust <michael@databricks.com> Closes #1004 from marmbrus/fixAggBug and squashes the following commits: e0c116c [Michael Armbrust] Compute aggregate expression during planning instead of lazily on workers.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala15
1 files changed, 5 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
index 604914e547..34d88fe4bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
@@ -77,8 +77,7 @@ case class Aggregate(
resultAttribute: AttributeReference)
/** A list of aggregates that need to be computed for each group. */
- @transient
- private[this] lazy val computedAggregates = aggregateExpressions.flatMap { agg =>
+ private[this] val computedAggregates = aggregateExpressions.flatMap { agg =>
agg.collect {
case a: AggregateExpression =>
ComputedAggregate(
@@ -89,8 +88,7 @@ case class Aggregate(
}.toArray
/** The schema of the result of all aggregate evaluations */
- @transient
- private[this] lazy val computedSchema = computedAggregates.map(_.resultAttribute)
+ private[this] val computedSchema = computedAggregates.map(_.resultAttribute)
/** Creates a new aggregate buffer for a group. */
private[this] def newAggregateBuffer(): Array[AggregateFunction] = {
@@ -104,8 +102,7 @@ case class Aggregate(
}
/** Named attributes used to substitute grouping attributes into the final result. */
- @transient
- private[this] lazy val namedGroups = groupingExpressions.map {
+ private[this] val namedGroups = groupingExpressions.map {
case ne: NamedExpression => ne -> ne.toAttribute
case e => e -> Alias(e, s"groupingExpr:$e")().toAttribute
}
@@ -114,16 +111,14 @@ case class Aggregate(
* A map of substitutions that are used to insert the aggregate expressions and grouping
* expression into the final result expression.
*/
- @transient
- private[this] lazy val resultMap =
+ private[this] val resultMap =
(computedAggregates.map { agg => agg.unbound -> agg.resultAttribute } ++ namedGroups).toMap
/**
* Substituted version of aggregateExpressions expressions which are used to compute final
* output rows given a group and the result of all aggregate computations.
*/
- @transient
- private[this] lazy val resultExpressions = aggregateExpressions.map { agg =>
+ private[this] val resultExpressions = aggregateExpressions.map { agg =>
agg.transform {
case e: Expression if resultMap.contains(e) => resultMap(e)
}