aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-08-07 13:26:03 -0700
committerReynold Xin <rxin@databricks.com>2015-08-07 13:26:03 -0700
commitaeddeafc03d77a5149d2c8f9489b0ca83e6b3e03 (patch)
tree1a036e5458db78fe294c044a72a5db2f996702a3 /sql
parent9897cc5e3d6c70f7e45e887e2c6fc24dfa1adada (diff)
downloadspark-aeddeafc03d77a5149d2c8f9489b0ca83e6b3e03.tar.gz
spark-aeddeafc03d77a5149d2c8f9489b0ca83e6b3e03.tar.bz2
spark-aeddeafc03d77a5149d2c8f9489b0ca83e6b3e03.zip
[SPARK-9667][SQL] followup: Use GenerateUnsafeProjection.canSupport to test Exchange supported data types.
This way we recursively test the data types. cc chenghao-intel Author: Reynold Xin <rxin@databricks.com> Closes #8036 from rxin/cansupport and squashes the following commits: f7302ff [Reynold Xin] Can GenerateUnsafeProjection.canSupport to test Exchange supported data types.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala15
1 files changed, 4 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 60087f2ca4..49bb729800 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -27,9 +27,9 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.types.UserDefinedType
import org.apache.spark.util.MutablePair
import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv}
@@ -43,18 +43,11 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
override def nodeName: String = if (tungstenMode) "TungstenExchange" else "Exchange"
/**
- * Returns true iff the children outputs aggregate UDTs that are not part of the SQL type.
- * This only happens with the old aggregate implementation and should be removed in 1.6.
+ * Returns true iff we can support the data type, and we are not doing range partitioning.
*/
private lazy val tungstenMode: Boolean = {
- val unserializableUDT = child.schema.exists(_.dataType match {
- case _: UserDefinedType[_] => true
- case _ => false
- })
- // Do not use the Unsafe path if we are using a RangePartitioning, since this may lead to
- // an interpreted RowOrdering being applied to an UnsafeRow, which will lead to
- // ClassCastExceptions at runtime. This check can be removed after SPARK-9054 is fixed.
- !unserializableUDT && !newPartitioning.isInstanceOf[RangePartitioning]
+ GenerateUnsafeProjection.canSupport(child.schema) &&
+ !newPartitioning.isInstanceOf[RangePartitioning]
}
override def outputPartitioning: Partitioning = newPartitioning