aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorTakuya UESHIN <ueshin@happy-camper.st>2014-07-05 11:51:48 -0700
committerMichael Armbrust <michael@databricks.com>2014-07-05 11:51:48 -0700
commit9d5ecf8205b924dc8a3c13fed68beb78cc5c7553 (patch)
treef6ec13b96cfa3ed5391a1538e2c8eb97713bea98 /sql/core
parent3da8df939ec63064692ba64d9188aeea908b305c (diff)
downloadspark-9d5ecf8205b924dc8a3c13fed68beb78cc5c7553.tar.gz
spark-9d5ecf8205b924dc8a3c13fed68beb78cc5c7553.tar.bz2
spark-9d5ecf8205b924dc8a3c13fed68beb78cc5c7553.zip
[SPARK-2327] [SQL] Fix nullabilities of Join/Generate/Aggregate.
Fix nullabilities of `Join`/`Generate`/`Aggregate` because: - Output attributes of opposite side of `OuterJoin` should be nullable. - Output attributes of generater side of `Generate` should be nullable if `join` is `true` and `outer` is `true`. - `AttributeReference` of `computedAggregates` of `Aggregate` should be the same as `aggregateExpression`'s. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #1266 from ueshin/issues/SPARK-2327 and squashes the following commits: 3ace83a [Takuya UESHIN] Add withNullability to Attribute and use it to change nullabilities. df1ae53 [Takuya UESHIN] Modify nullabilize to leave attribute if not resolved. 799ce56 [Takuya UESHIN] Add nullabilization to Generate of SparkPlan. a0fc9bc [Takuya UESHIN] Fix scalastyle errors. 0e31e37 [Takuya UESHIN] Fix Aggregate resultAttribute nullabilities. 09532ec [Takuya UESHIN] Fix Generate output nullabilities. f20f196 [Takuya UESHIN] Fix Join output nullabilities.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala13
3 files changed, 24 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
index d85d2d7844..c1ced8bfa4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
@@ -83,8 +83,8 @@ case class Aggregate(
case a: AggregateExpression =>
ComputedAggregate(
a,
- BindReferences.bindReference(a, childOutput).asInstanceOf[AggregateExpression],
- AttributeReference(s"aggResult:$a", a.dataType, nullable = true)())
+ BindReferences.bindReference(a, childOutput),
+ AttributeReference(s"aggResult:$a", a.dataType, a.nullable)())
}
}.toArray
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
index da1e08be59..47b3d00262 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.catalyst.expressions.{Generator, JoinedRow, Literal, Projection}
+import org.apache.spark.sql.catalyst.expressions._
/**
* :: DeveloperApi ::
@@ -39,8 +39,16 @@ case class Generate(
child: SparkPlan)
extends UnaryNode {
+ protected def generatorOutput: Seq[Attribute] = {
+ if (join && outer) {
+ generator.output.map(_.withNullability(true))
+ } else {
+ generator.output
+ }
+ }
+
override def output =
- if (join) child.output ++ generator.output else generator.output
+ if (join) child.output ++ generatorOutput else generatorOutput
override def execute() = {
if (join) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
index 32c5f26fe8..7d1f11caae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
@@ -319,7 +319,18 @@ case class BroadcastNestedLoopJoin(
override def otherCopyArgs = sqlContext :: Nil
- def output = left.output ++ right.output
+ override def output = {
+ joinType match {
+ case LeftOuter =>
+ left.output ++ right.output.map(_.withNullability(true))
+ case RightOuter =>
+ left.output.map(_.withNullability(true)) ++ right.output
+ case FullOuter =>
+ left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
+ case _ =>
+ left.output ++ right.output
+ }
+ }
/** The Streamed Relation */
def left = streamed