diff options
author | Davies Liu <davies@databricks.com> | 2015-06-15 23:03:14 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-06-15 23:03:14 -0700 |
commit | bc76a0f7506c9796209a96b027a236270c23bbf6 (patch) | |
tree | e3681cdabfc89ef9b661cabf568181884d7378eb /sql | |
parent | 1a62d61696a0481508d83a07d19ab3701245ac20 (diff) | |
download | spark-bc76a0f7506c9796209a96b027a236270c23bbf6.tar.gz spark-bc76a0f7506c9796209a96b027a236270c23bbf6.tar.bz2 spark-bc76a0f7506c9796209a96b027a236270c23bbf6.zip |
[SPARK-7184] [SQL] enable codegen by default
In order to have better performance out of box, this PR turn on codegen by default, then codegen can be tested by sql/test and hive/test.
This PR also fix some corner cases for codegen.
Before 1.5 release, we should re-visit this, turn it off if it's not stable or causing regressions.
cc rxin JoshRosen
Author: Davies Liu <davies@databricks.com>
Closes #6726 from davies/enable_codegen and squashes the following commits:
f3b25a5 [Davies Liu] fix warning
73750ea [Davies Liu] fix long overflow when compare
3017a47 [Davies Liu] Merge branch 'master' of github.com:apache/spark into enable_codegen
a7d75da [Davies Liu] Merge branch 'master' of github.com:apache/spark into enable_codegen
ff5b75a [Davies Liu] Merge branch 'master' of github.com:apache/spark into enable_codegen
f4cf2c2 [Davies Liu] fix style
99fc139 [Davies Liu] Merge branch 'enable_codegen' of github.com:davies/spark into enable_codegen
91fc7a2 [Davies Liu] disable codegen for ScalaUDF
207e339 [Davies Liu] Update CodeGenerator.scala
44573a3 [Davies Liu] check thread safety of expression
f3886fa [Davies Liu] don't inline primitiveTerm for null literal
c8e7cd2 [Davies Liu] address comment
a8618c9 [Davies Liu] enable codegen by default
Diffstat (limited to 'sql')
21 files changed, 95 insertions, 81 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala index c4dd11a451..5db2fcfcb2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -19,9 +19,9 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.Logging import org.apache.spark.sql.catalyst.errors.attachTree -import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode} +import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.types._ -import org.apache.spark.sql.catalyst.{InternalRow, trees} /** * A bound reference points to a specific slot in the input tuple, allowing the actual value diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 7427ca76b5..a10a959ae7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -61,6 +61,14 @@ abstract class Expression extends TreeNode[Expression] { def eval(input: InternalRow = null): Any /** + * Return true if this expression is thread-safe, which means it could be used by multiple + * threads in the same time. + * + * An expression that is not thread-safe can not be cached and re-used, especially for codegen. + */ + def isThreadSafe: Boolean = true + + /** * Returns an [[GeneratedExpressionCode]], which contains Java source code that * can be used to generate the result of evaluating the expression on an input row. * @@ -68,6 +76,9 @@ abstract class Expression extends TreeNode[Expression] { * @return [[GeneratedExpressionCode]] */ def gen(ctx: CodeGenContext): GeneratedExpressionCode = { + if (!isThreadSafe) { + throw new Exception(s"$this is not thread-safe, can not be used in codegen") + } val isNull = ctx.freshName("isNull") val primitive = ctx.freshName("primitive") val ve = GeneratedExpressionCode("", isNull, primitive) @@ -169,6 +180,7 @@ abstract class BinaryExpression extends Expression with trees.BinaryNode[Express override def toString: String = s"($left $symbol $right)" + override def isThreadSafe: Boolean = left.isThreadSafe && right.isThreadSafe /** * Short hand for generating binary evaluation code, which depends on two sub-evaluations of * the same type. If either of the sub-expressions is null, the result of this computation @@ -218,6 +230,7 @@ abstract class UnaryExpression extends Expression with trees.UnaryNode[Expressio override def foldable: Boolean = child.foldable override def nullable: Boolean = child.nullable + override def isThreadSafe: Boolean = child.isThreadSafe /** * Called by unary expressions to generate a code block that returns null if its parent returns diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala index b3ce698c55..3992f1f59d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala @@ -958,4 +958,6 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi private[this] val converter = CatalystTypeConverters.createToCatalystConverter(dataType) override def eval(input: InternalRow): Any = converter(f(input)) + // TODO(davies): make ScalaUdf work with codegen + override def isThreadSafe: Boolean = false } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala index 8a34355999..4baae03b3a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.errors.TreeNodeException -import org.apache.spark.sql.catalyst.{InternalRow, trees} +import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.types.DataType abstract sealed class SortDirection diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala index 9d1e96572a..8b78c50000 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala @@ -341,31 +341,29 @@ case class MaxOf(left: Expression, right: Expression) extends BinaryArithmetic { } override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { - if (ctx.isNativeType(left.dataType)) { - val eval1 = left.gen(ctx) - val eval2 = right.gen(ctx) - eval1.code + eval2.code + s""" - boolean ${ev.isNull} = false; - ${ctx.javaType(left.dataType)} ${ev.primitive} = - ${ctx.defaultValue(left.dataType)}; - - if (${eval1.isNull}) { - ${ev.isNull} = ${eval2.isNull}; - ${ev.primitive} = ${eval2.primitive}; - } else if (${eval2.isNull}) { - ${ev.isNull} = ${eval1.isNull}; + val eval1 = left.gen(ctx) + val eval2 = right.gen(ctx) + val compCode = ctx.genComp(dataType, eval1.primitive, eval2.primitive) + + eval1.code + eval2.code + s""" + boolean ${ev.isNull} = false; + ${ctx.javaType(left.dataType)} ${ev.primitive} = + ${ctx.defaultValue(left.dataType)}; + + if (${eval1.isNull}) { + ${ev.isNull} = ${eval2.isNull}; + ${ev.primitive} = ${eval2.primitive}; + } else if (${eval2.isNull}) { + ${ev.isNull} = ${eval1.isNull}; + ${ev.primitive} = ${eval1.primitive}; + } else { + if ($compCode > 0) { ${ev.primitive} = ${eval1.primitive}; } else { - if (${eval1.primitive} > ${eval2.primitive}) { - ${ev.primitive} = ${eval1.primitive}; - } else { - ${ev.primitive} = ${eval2.primitive}; - } + ${ev.primitive} = ${eval2.primitive}; } - """ - } else { - super.genCode(ctx, ev) - } + } + """ } override def toString: String = s"MaxOf($left, $right)" } @@ -395,33 +393,29 @@ case class MinOf(left: Expression, right: Expression) extends BinaryArithmetic { } override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { - if (ctx.isNativeType(left.dataType)) { - - val eval1 = left.gen(ctx) - val eval2 = right.gen(ctx) - - eval1.code + eval2.code + s""" - boolean ${ev.isNull} = false; - ${ctx.javaType(left.dataType)} ${ev.primitive} = - ${ctx.defaultValue(left.dataType)}; + val eval1 = left.gen(ctx) + val eval2 = right.gen(ctx) + val compCode = ctx.genComp(dataType, eval1.primitive, eval2.primitive) - if (${eval1.isNull}) { - ${ev.isNull} = ${eval2.isNull}; - ${ev.primitive} = ${eval2.primitive}; - } else if (${eval2.isNull}) { - ${ev.isNull} = ${eval1.isNull}; + eval1.code + eval2.code + s""" + boolean ${ev.isNull} = false; + ${ctx.javaType(left.dataType)} ${ev.primitive} = + ${ctx.defaultValue(left.dataType)}; + + if (${eval1.isNull}) { + ${ev.isNull} = ${eval2.isNull}; + ${ev.primitive} = ${eval2.primitive}; + } else if (${eval2.isNull}) { + ${ev.isNull} = ${eval1.isNull}; + ${ev.primitive} = ${eval1.primitive}; + } else { + if ($compCode < 0) { ${ev.primitive} = ${eval1.primitive}; } else { - if (${eval1.primitive} < ${eval2.primitive}) { - ${ev.primitive} = ${eval1.primitive}; - } else { - ${ev.primitive} = ${eval2.primitive}; - } + ${ev.primitive} = ${eval2.primitive}; } - """ - } else { - super.genCode(ctx, ev) - } + } + """ } override def toString: String = s"MinOf($left, $right)" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 54f06aaa10..ab850d17a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -24,7 +24,6 @@ import com.google.common.cache.{CacheBuilder, CacheLoader} import org.codehaus.janino.ClassBodyEvaluator import org.apache.spark.Logging -import org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -176,9 +175,8 @@ class CodeGenContext { * Generate code for compare expression in Java */ def genComp(dataType: DataType, c1: String, c2: String): String = dataType match { - // Use signum() to keep any small difference bwteen float/double - case FloatType | DoubleType => s"(int)java.lang.Math.signum($c1 - $c2)" - case dt: DataType if isPrimitiveType(dt) => s"(int)($c1 - $c2)" + // use c1 - c2 may overflow + case dt: DataType if isPrimitiveType(dt) => s"(int)($c1 > $c2 ? 1 : $c1 < $c2 ? -1 : 0)" case BinaryType => s"org.apache.spark.sql.catalyst.util.TypeUtils.compareBinary($c1, $c2)" case other => s"$c1.compare($c2)" } @@ -266,7 +264,7 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin * weak keys/values and thus does not respond to memory pressure. */ protected val cache = CacheBuilder.newBuilder() - .maximumSize(1000) + .maximumSize(100) .build( new CacheLoader[InType, OutType]() { override def load(in: InType): OutType = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index f22c8a7f6a..58dbeaf89c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -117,6 +117,8 @@ case class Alias(child: Expression, name: String)( override def eval(input: InternalRow): Any = child.eval(input) + override def isThreadSafe: Boolean = child.isThreadSafe + override def gen(ctx: CodeGenContext): GeneratedExpressionCode = child.gen(ctx) override def dataType: DataType = child.dataType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala index 0d06589a79..98acaf23c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.analysis.UnresolvedException import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode} -import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.types.DataType case class Coalesce(children: Seq[Expression]) extends Expression { @@ -53,6 +52,8 @@ case class Coalesce(children: Seq[Expression]) extends Expression { result } + override def isThreadSafe: Boolean = children.forall(_.isThreadSafe) + override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { s""" boolean ${ev.isNull} = true; @@ -73,7 +74,7 @@ case class Coalesce(children: Seq[Expression]) extends Expression { } } -case class IsNull(child: Expression) extends Predicate with trees.UnaryNode[Expression] { +case class IsNull(child: Expression) extends UnaryExpression with Predicate { override def foldable: Boolean = child.foldable override def nullable: Boolean = false @@ -91,7 +92,7 @@ case class IsNull(child: Expression) extends Predicate with trees.UnaryNode[Expr override def toString: String = s"IS NULL $child" } -case class IsNotNull(child: Expression) extends Predicate with trees.UnaryNode[Expression] { +case class IsNotNull(child: Expression) extends UnaryExpression with Predicate { override def foldable: Boolean = child.foldable override def nullable: Boolean = false override def toString: String = s"IS NOT NULL $child" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index 056f170539..896e383f50 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.analysis.UnresolvedException import org.apache.spark.sql.catalyst.errors.TreeNodeException -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{NumericType, DataType} +import org.apache.spark.sql.types.{DataType, NumericType} /** * The trait of the Window Specification (specified in the OVER clause or WINDOW clause) for diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala index 2c946cd12f..1868f119f0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters, analysis} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, analysis} import org.apache.spark.sql.types.{StructField, StructType} object LocalRelation { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala index b4d5e013f3..c2d739b529 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala @@ -21,7 +21,6 @@ import java.math.BigInteger import java.sql.{Date, Timestamp} import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types._ case class PrimitiveData( @@ -75,7 +74,7 @@ case class MultipleConstructorsData(a: Int, b: String, c: Double) { } class ScalaReflectionSuite extends SparkFunSuite { - import ScalaReflection._ + import org.apache.spark.sql.catalyst.ScalaReflection._ test("primitive data") { val schema = schemaFor[PrimitiveData] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 87f40482e3..55ab6b3358 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -171,15 +171,11 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN, "false").toBoolean /** - * When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode + * When set to true, Spark SQL will use the Janino at runtime to generate custom bytecode * that evaluates expressions found in queries. In general this custom code runs much faster - * than interpreted evaluation, but there are significant start-up costs due to compilation. - * As a result codegen is only beneficial when queries run for a long time, or when the same - * expressions are used multiple times. - * - * Defaults to false as this feature is currently experimental. + * than interpreted evaluation, but there are some start-up costs (5-10ms) due to compilation. */ - private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "false").toBoolean + private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "true").toBoolean /** * caseSensitive analysis true by default diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 22d0e50e4e..9d1f89d6d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -31,14 +31,13 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.{InternalRow, _} import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.errors.DialectException +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor -import org.apache.spark.sql.catalyst.ParserDialect +import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, _} import org.apache.spark.sql.execution.{Filter, _} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala index cc7506dec1..1949625699 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.columnar import java.nio.{ByteBuffer, ByteOrder} + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.columnar.ColumnBuilder._ import org.apache.spark.sql.columnar.compression.{AllCompressionSchemes, CompressibleColumnBuilder} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 7739a9f949..2b8d302942 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -156,7 +156,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = { log.debug( s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled") - if (codegenEnabled) { + if (codegenEnabled && expressions.forall(_.isThreadSafe)) { GenerateProjection.generate(expressions, inputSchema) } else { new InterpretedProjection(expressions, inputSchema) @@ -168,7 +168,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ inputSchema: Seq[Attribute]): () => MutableProjection = { log.debug( s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled") - if(codegenEnabled) { + if(codegenEnabled && expressions.forall(_.isThreadSafe)) { + GenerateMutableProjection.generate(expressions, inputSchema) } else { () => new InterpretedMutableProjection(expressions, inputSchema) @@ -178,7 +179,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def newPredicate( expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = { - if (codegenEnabled) { + if (codegenEnabled && expression.isThreadSafe) { GeneratePredicate.generate(expression, inputSchema) } else { InterpretedPredicate.create(expression, inputSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala index 68914cf85c..3b217348b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala @@ -48,4 +48,6 @@ private[sql] case class MonotonicallyIncreasingID() extends LeafExpression { count += 1 (TaskContext.get().partitionId().toLong << 33) + currentCount } + + override def isThreadSafe: Boolean = false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala index 044964f3a3..412a3d4178 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala @@ -50,7 +50,8 @@ case class BroadcastLeftSemiJoinHash( if (!rowKey.anyNull) { val keyExists = hashSet.contains(rowKey) if (!keyExists) { - hashSet.add(rowKey) + // rowKey may be not serializable (from codegen) + hashSet.add(rowKey.copy()) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index bc27a9b67a..bba6f1ec96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -34,16 +34,16 @@ import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.hadoop.util.ContextUtil -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.{Partition => SparkPartition, SerializableWritable, Logging, SparkException} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.rdd.RDD._ import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.RDD._ import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SerializableWritable, SparkException, Partition => SparkPartition} private[sql] class DefaultSource extends HadoopFsRelationProvider { override def createRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 1763cee419..3dbe6faabf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, FileOutputCommitter => MapReduceFileOutputCommitter} import org.apache.parquet.hadoop.util.ContextUtil import org.apache.spark._ @@ -211,6 +211,7 @@ private[sql] case class InsertIntoHadoopFsRelation( val partitionProj = newProjection(codegenEnabled, partitionOutput, output) val dataProj = newProjection(codegenEnabled, dataOutput, output) + val dataConverter: InternalRow => Row = if (needsConversion) { CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row] } else { @@ -244,7 +245,7 @@ private[sql] case class InsertIntoHadoopFsRelation( inputSchema: Seq[Attribute]): Projection = { log.debug( s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled") - if (codegenEnabled) { + if (codegenEnabled && expressions.forall(_.isThreadSafe)) { GenerateProjection.generate(expressions, inputSchema) } else { new InterpretedProjection(expressions, inputSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 27534a1f48..43d3507d7d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -576,6 +576,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio // Yeah, to workaround serialization... val dataSchema = this.dataSchema val codegenEnabled = this.codegenEnabled + val needConversion = this.needConversion val requiredOutput = requiredColumns.map { col => val field = dataSchema(col) @@ -590,7 +591,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio rdd.map(_.asInstanceOf[InternalRow]) } converted.mapPartitions { rows => - val buildProjection = if (codegenEnabled) { + val buildProjection = if (codegenEnabled && requiredOutput.forall(_.isThreadSafe)) { GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes) } else { () => new InterpretedMutableProjection(requiredOutput, dataSchema.toAttributes) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index c40dd4e4b9..4986b1ea9d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -120,6 +120,8 @@ private[hive] case class HiveSimpleUdf(funcWrapper: HiveFunctionWrapper, childre @transient protected lazy val cached: Array[AnyRef] = new Array[AnyRef](children.length) + override def isThreadSafe: Boolean = false + // TODO: Finish input output types. override def eval(input: InternalRow): Any = { unwrap( @@ -178,6 +180,8 @@ private[hive] case class HiveGenericUdf(funcWrapper: HiveFunctionWrapper, childr lazy val dataType: DataType = inspectorToDataType(returnInspector) + override def isThreadSafe: Boolean = false + override def eval(input: InternalRow): Any = { returnInspector // Make sure initialized. |