From b459aa77f63d0a469dc20e0ef555cf94382f41ca Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 5 Jul 2014 11:51:48 -0700 Subject: [SPARK-2327] [SQL] Fix nullabilities of Join/Generate/Aggregate. Fix nullabilities of `Join`/`Generate`/`Aggregate` because: - Output attributes of opposite side of `OuterJoin` should be nullable. - Output attributes of generater side of `Generate` should be nullable if `join` is `true` and `outer` is `true`. - `AttributeReference` of `computedAggregates` of `Aggregate` should be the same as `aggregateExpression`'s. Author: Takuya UESHIN Closes #1266 from ueshin/issues/SPARK-2327 and squashes the following commits: 3ace83a [Takuya UESHIN] Add withNullability to Attribute and use it to change nullabilities. df1ae53 [Takuya UESHIN] Modify nullabilize to leave attribute if not resolved. 799ce56 [Takuya UESHIN] Add nullabilization to Generate of SparkPlan. a0fc9bc [Takuya UESHIN] Fix scalastyle errors. 0e31e37 [Takuya UESHIN] Fix Aggregate resultAttribute nullabilities. 09532ec [Takuya UESHIN] Fix Generate output nullabilities. f20f196 [Takuya UESHIN] Fix Join output nullabilities. (cherry picked from commit 9d5ecf8205b924dc8a3c13fed68beb78cc5c7553) Signed-off-by: Michael Armbrust --- .../spark/sql/catalyst/analysis/unresolved.scala | 2 ++ .../sql/catalyst/expressions/BoundAttribute.scala | 16 ++++++----- .../catalyst/expressions/namedExpressions.scala | 3 ++- .../catalyst/plans/logical/basicOperators.scala | 31 ++++++++++++++++------ 4 files changed, 36 insertions(+), 16 deletions(-) (limited to 'sql/catalyst') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index d629172a74..7abeb03296 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -52,6 +52,7 @@ case class UnresolvedAttribute(name: String) extends Attribute with trees.LeafNo override lazy val resolved = false override def newInstance = this + override def withNullability(newNullability: Boolean) = this override def withQualifiers(newQualifiers: Seq[String]) = this // Unresolved attributes are transient at compile time and don't get evaluated during execution. @@ -95,6 +96,7 @@ case class Star( override lazy val resolved = false override def newInstance = this + override def withNullability(newNullability: Boolean) = this override def withQualifiers(newQualifiers: Seq[String]) = this def expand(input: Seq[Attribute]): Seq[NamedExpression] = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala index 655d4a08fe..9ce1f01056 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -33,14 +33,16 @@ case class BoundReference(ordinal: Int, baseReference: Attribute) type EvaluatedType = Any - def nullable = baseReference.nullable - def dataType = baseReference.dataType - def exprId = baseReference.exprId - def qualifiers = baseReference.qualifiers - def name = baseReference.name + override def nullable = baseReference.nullable + override def dataType = baseReference.dataType + override def exprId = baseReference.exprId + override def qualifiers = baseReference.qualifiers + override def name = baseReference.name - def newInstance = BoundReference(ordinal, baseReference.newInstance) - def withQualifiers(newQualifiers: Seq[String]) = + override def newInstance = BoundReference(ordinal, baseReference.newInstance) + override def withNullability(newNullability: Boolean) = + BoundReference(ordinal, baseReference.withNullability(newNullability)) + override def withQualifiers(newQualifiers: Seq[String]) = BoundReference(ordinal, baseReference.withQualifiers(newQualifiers)) override def toString = s"$baseReference:$ordinal" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 66ae22e95b..934bad8c27 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -57,6 +57,7 @@ abstract class NamedExpression extends Expression { abstract class Attribute extends NamedExpression { self: Product => + def withNullability(newNullability: Boolean): Attribute def withQualifiers(newQualifiers: Seq[String]): Attribute def toAttribute = this @@ -133,7 +134,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea /** * Returns a copy of this [[AttributeReference]] with changed nullability. */ - def withNullability(newNullability: Boolean) = { + override def withNullability(newNullability: Boolean) = { if (nullable == newNullability) { this } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 3e0639867b..b51a02d5ac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.{LeftSemi, JoinType} +import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.types._ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { @@ -46,10 +46,16 @@ case class Generate( child: LogicalPlan) extends UnaryNode { - protected def generatorOutput: Seq[Attribute] = - alias + protected def generatorOutput: Seq[Attribute] = { + val output = alias .map(a => generator.output.map(_.withQualifiers(a :: Nil))) .getOrElse(generator.output) + if (join && outer) { + output.map(_.withNullability(true)) + } else { + output + } + } override def output = if (join) child.output ++ generatorOutput else generatorOutput @@ -81,11 +87,20 @@ case class Join( condition: Option[Expression]) extends BinaryNode { override def references = condition.map(_.references).getOrElse(Set.empty) - override def output = joinType match { - case LeftSemi => - left.output - case _ => - left.output ++ right.output + + override def output = { + joinType match { + case LeftSemi => + left.output + case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) + case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output + case FullOuter => + left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) + case _ => + left.output ++ right.output + } } } -- cgit v1.2.3