aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-06-15 23:03:14 -0700
committerReynold Xin <rxin@databricks.com>2015-06-15 23:03:14 -0700
commitbc76a0f7506c9796209a96b027a236270c23bbf6 (patch)
treee3681cdabfc89ef9b661cabf568181884d7378eb /sql
parent1a62d61696a0481508d83a07d19ab3701245ac20 (diff)
downloadspark-bc76a0f7506c9796209a96b027a236270c23bbf6.tar.gz
spark-bc76a0f7506c9796209a96b027a236270c23bbf6.tar.bz2
spark-bc76a0f7506c9796209a96b027a236270c23bbf6.zip
[SPARK-7184] [SQL] enable codegen by default
In order to have better performance out of box, this PR turn on codegen by default, then codegen can be tested by sql/test and hive/test. This PR also fix some corner cases for codegen. Before 1.5 release, we should re-visit this, turn it off if it's not stable or causing regressions. cc rxin JoshRosen Author: Davies Liu <davies@databricks.com> Closes #6726 from davies/enable_codegen and squashes the following commits: f3b25a5 [Davies Liu] fix warning 73750ea [Davies Liu] fix long overflow when compare 3017a47 [Davies Liu] Merge branch 'master' of github.com:apache/spark into enable_codegen a7d75da [Davies Liu] Merge branch 'master' of github.com:apache/spark into enable_codegen ff5b75a [Davies Liu] Merge branch 'master' of github.com:apache/spark into enable_codegen f4cf2c2 [Davies Liu] fix style 99fc139 [Davies Liu] Merge branch 'enable_codegen' of github.com:davies/spark into enable_codegen 91fc7a2 [Davies Liu] disable codegen for ScalaUDF 207e339 [Davies Liu] Update CodeGenerator.scala 44573a3 [Davies Liu] check thread safety of expression f3886fa [Davies Liu] don't inline primitiveTerm for null literal c8e7cd2 [Davies Liu] address comment a8618c9 [Davies Liu] enable codegen by default
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala13
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala84
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala4
21 files changed, 95 insertions, 81 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
index c4dd11a451..5db2fcfcb2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
@@ -19,9 +19,9 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.errors.attachTree
-import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
+import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.types._
-import org.apache.spark.sql.catalyst.{InternalRow, trees}
/**
* A bound reference points to a specific slot in the input tuple, allowing the actual value
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index 7427ca76b5..a10a959ae7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -61,6 +61,14 @@ abstract class Expression extends TreeNode[Expression] {
def eval(input: InternalRow = null): Any
/**
+ * Return true if this expression is thread-safe, which means it could be used by multiple
+ * threads in the same time.
+ *
+ * An expression that is not thread-safe can not be cached and re-used, especially for codegen.
+ */
+ def isThreadSafe: Boolean = true
+
+ /**
* Returns an [[GeneratedExpressionCode]], which contains Java source code that
* can be used to generate the result of evaluating the expression on an input row.
*
@@ -68,6 +76,9 @@ abstract class Expression extends TreeNode[Expression] {
* @return [[GeneratedExpressionCode]]
*/
def gen(ctx: CodeGenContext): GeneratedExpressionCode = {
+ if (!isThreadSafe) {
+ throw new Exception(s"$this is not thread-safe, can not be used in codegen")
+ }
val isNull = ctx.freshName("isNull")
val primitive = ctx.freshName("primitive")
val ve = GeneratedExpressionCode("", isNull, primitive)
@@ -169,6 +180,7 @@ abstract class BinaryExpression extends Expression with trees.BinaryNode[Express
override def toString: String = s"($left $symbol $right)"
+ override def isThreadSafe: Boolean = left.isThreadSafe && right.isThreadSafe
/**
* Short hand for generating binary evaluation code, which depends on two sub-evaluations of
* the same type. If either of the sub-expressions is null, the result of this computation
@@ -218,6 +230,7 @@ abstract class UnaryExpression extends Expression with trees.UnaryNode[Expressio
override def foldable: Boolean = child.foldable
override def nullable: Boolean = child.nullable
+ override def isThreadSafe: Boolean = child.isThreadSafe
/**
* Called by unary expressions to generate a code block that returns null if its parent returns
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
index b3ce698c55..3992f1f59d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
@@ -958,4 +958,6 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
private[this] val converter = CatalystTypeConverters.createToCatalystConverter(dataType)
override def eval(input: InternalRow): Any = converter(f(input))
+ // TODO(davies): make ScalaUdf work with codegen
+ override def isThreadSafe: Boolean = false
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index 8a34355999..4baae03b3a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.errors.TreeNodeException
-import org.apache.spark.sql.catalyst.{InternalRow, trees}
+import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.types.DataType
abstract sealed class SortDirection
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
index 9d1e96572a..8b78c50000 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
@@ -341,31 +341,29 @@ case class MaxOf(left: Expression, right: Expression) extends BinaryArithmetic {
}
override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
- if (ctx.isNativeType(left.dataType)) {
- val eval1 = left.gen(ctx)
- val eval2 = right.gen(ctx)
- eval1.code + eval2.code + s"""
- boolean ${ev.isNull} = false;
- ${ctx.javaType(left.dataType)} ${ev.primitive} =
- ${ctx.defaultValue(left.dataType)};
-
- if (${eval1.isNull}) {
- ${ev.isNull} = ${eval2.isNull};
- ${ev.primitive} = ${eval2.primitive};
- } else if (${eval2.isNull}) {
- ${ev.isNull} = ${eval1.isNull};
+ val eval1 = left.gen(ctx)
+ val eval2 = right.gen(ctx)
+ val compCode = ctx.genComp(dataType, eval1.primitive, eval2.primitive)
+
+ eval1.code + eval2.code + s"""
+ boolean ${ev.isNull} = false;
+ ${ctx.javaType(left.dataType)} ${ev.primitive} =
+ ${ctx.defaultValue(left.dataType)};
+
+ if (${eval1.isNull}) {
+ ${ev.isNull} = ${eval2.isNull};
+ ${ev.primitive} = ${eval2.primitive};
+ } else if (${eval2.isNull}) {
+ ${ev.isNull} = ${eval1.isNull};
+ ${ev.primitive} = ${eval1.primitive};
+ } else {
+ if ($compCode > 0) {
${ev.primitive} = ${eval1.primitive};
} else {
- if (${eval1.primitive} > ${eval2.primitive}) {
- ${ev.primitive} = ${eval1.primitive};
- } else {
- ${ev.primitive} = ${eval2.primitive};
- }
+ ${ev.primitive} = ${eval2.primitive};
}
- """
- } else {
- super.genCode(ctx, ev)
- }
+ }
+ """
}
override def toString: String = s"MaxOf($left, $right)"
}
@@ -395,33 +393,29 @@ case class MinOf(left: Expression, right: Expression) extends BinaryArithmetic {
}
override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
- if (ctx.isNativeType(left.dataType)) {
-
- val eval1 = left.gen(ctx)
- val eval2 = right.gen(ctx)
-
- eval1.code + eval2.code + s"""
- boolean ${ev.isNull} = false;
- ${ctx.javaType(left.dataType)} ${ev.primitive} =
- ${ctx.defaultValue(left.dataType)};
+ val eval1 = left.gen(ctx)
+ val eval2 = right.gen(ctx)
+ val compCode = ctx.genComp(dataType, eval1.primitive, eval2.primitive)
- if (${eval1.isNull}) {
- ${ev.isNull} = ${eval2.isNull};
- ${ev.primitive} = ${eval2.primitive};
- } else if (${eval2.isNull}) {
- ${ev.isNull} = ${eval1.isNull};
+ eval1.code + eval2.code + s"""
+ boolean ${ev.isNull} = false;
+ ${ctx.javaType(left.dataType)} ${ev.primitive} =
+ ${ctx.defaultValue(left.dataType)};
+
+ if (${eval1.isNull}) {
+ ${ev.isNull} = ${eval2.isNull};
+ ${ev.primitive} = ${eval2.primitive};
+ } else if (${eval2.isNull}) {
+ ${ev.isNull} = ${eval1.isNull};
+ ${ev.primitive} = ${eval1.primitive};
+ } else {
+ if ($compCode < 0) {
${ev.primitive} = ${eval1.primitive};
} else {
- if (${eval1.primitive} < ${eval2.primitive}) {
- ${ev.primitive} = ${eval1.primitive};
- } else {
- ${ev.primitive} = ${eval2.primitive};
- }
+ ${ev.primitive} = ${eval2.primitive};
}
- """
- } else {
- super.genCode(ctx, ev)
- }
+ }
+ """
}
override def toString: String = s"MinOf($left, $right)"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 54f06aaa10..ab850d17a6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -24,7 +24,6 @@ import com.google.common.cache.{CacheBuilder, CacheLoader}
import org.codehaus.janino.ClassBodyEvaluator
import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -176,9 +175,8 @@ class CodeGenContext {
* Generate code for compare expression in Java
*/
def genComp(dataType: DataType, c1: String, c2: String): String = dataType match {
- // Use signum() to keep any small difference bwteen float/double
- case FloatType | DoubleType => s"(int)java.lang.Math.signum($c1 - $c2)"
- case dt: DataType if isPrimitiveType(dt) => s"(int)($c1 - $c2)"
+ // use c1 - c2 may overflow
+ case dt: DataType if isPrimitiveType(dt) => s"(int)($c1 > $c2 ? 1 : $c1 < $c2 ? -1 : 0)"
case BinaryType => s"org.apache.spark.sql.catalyst.util.TypeUtils.compareBinary($c1, $c2)"
case other => s"$c1.compare($c2)"
}
@@ -266,7 +264,7 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
* weak keys/values and thus does not respond to memory pressure.
*/
protected val cache = CacheBuilder.newBuilder()
- .maximumSize(1000)
+ .maximumSize(100)
.build(
new CacheLoader[InType, OutType]() {
override def load(in: InType): OutType = {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index f22c8a7f6a..58dbeaf89c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -117,6 +117,8 @@ case class Alias(child: Expression, name: String)(
override def eval(input: InternalRow): Any = child.eval(input)
+ override def isThreadSafe: Boolean = child.isThreadSafe
+
override def gen(ctx: CodeGenContext): GeneratedExpressionCode = child.gen(ctx)
override def dataType: DataType = child.dataType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala
index 0d06589a79..98acaf23c4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
-import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.types.DataType
case class Coalesce(children: Seq[Expression]) extends Expression {
@@ -53,6 +52,8 @@ case class Coalesce(children: Seq[Expression]) extends Expression {
result
}
+ override def isThreadSafe: Boolean = children.forall(_.isThreadSafe)
+
override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
s"""
boolean ${ev.isNull} = true;
@@ -73,7 +74,7 @@ case class Coalesce(children: Seq[Expression]) extends Expression {
}
}
-case class IsNull(child: Expression) extends Predicate with trees.UnaryNode[Expression] {
+case class IsNull(child: Expression) extends UnaryExpression with Predicate {
override def foldable: Boolean = child.foldable
override def nullable: Boolean = false
@@ -91,7 +92,7 @@ case class IsNull(child: Expression) extends Predicate with trees.UnaryNode[Expr
override def toString: String = s"IS NULL $child"
}
-case class IsNotNull(child: Expression) extends Predicate with trees.UnaryNode[Expression] {
+case class IsNotNull(child: Expression) extends UnaryExpression with Predicate {
override def foldable: Boolean = child.foldable
override def nullable: Boolean = false
override def toString: String = s"IS NOT NULL $child"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index 056f170539..896e383f50 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -19,8 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.errors.TreeNodeException
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{NumericType, DataType}
+import org.apache.spark.sql.types.{DataType, NumericType}
/**
* The trait of the Window Specification (specified in the OVER clause or WINDOW clause) for
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
index 2c946cd12f..1868f119f0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters, analysis}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, analysis}
import org.apache.spark.sql.types.{StructField, StructType}
object LocalRelation {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index b4d5e013f3..c2d739b529 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -21,7 +21,6 @@ import java.math.BigInteger
import java.sql.{Date, Timestamp}
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._
case class PrimitiveData(
@@ -75,7 +74,7 @@ case class MultipleConstructorsData(a: Int, b: String, c: Double) {
}
class ScalaReflectionSuite extends SparkFunSuite {
- import ScalaReflection._
+ import org.apache.spark.sql.catalyst.ScalaReflection._
test("primitive data") {
val schema = schemaFor[PrimitiveData]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 87f40482e3..55ab6b3358 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -171,15 +171,11 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN, "false").toBoolean
/**
- * When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode
+ * When set to true, Spark SQL will use the Janino at runtime to generate custom bytecode
* that evaluates expressions found in queries. In general this custom code runs much faster
- * than interpreted evaluation, but there are significant start-up costs due to compilation.
- * As a result codegen is only beneficial when queries run for a long time, or when the same
- * expressions are used multiple times.
- *
- * Defaults to false as this feature is currently experimental.
+ * than interpreted evaluation, but there are some start-up costs (5-10ms) due to compilation.
*/
- private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "false").toBoolean
+ private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "true").toBoolean
/**
* caseSensitive analysis true by default
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 22d0e50e4e..9d1f89d6d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -31,14 +31,13 @@ import org.apache.spark.SparkContext
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{InternalRow, _}
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.errors.DialectException
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.catalyst.ParserDialect
+import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, _}
import org.apache.spark.sql.execution.{Filter, _}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
index cc7506dec1..1949625699 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.columnar
import java.nio.{ByteBuffer, ByteOrder}
+
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.columnar.ColumnBuilder._
import org.apache.spark.sql.columnar.compression.{AllCompressionSchemes, CompressibleColumnBuilder}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 7739a9f949..2b8d302942 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -156,7 +156,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = {
log.debug(
s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
- if (codegenEnabled) {
+ if (codegenEnabled && expressions.forall(_.isThreadSafe)) {
GenerateProjection.generate(expressions, inputSchema)
} else {
new InterpretedProjection(expressions, inputSchema)
@@ -168,7 +168,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
inputSchema: Seq[Attribute]): () => MutableProjection = {
log.debug(
s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
- if(codegenEnabled) {
+ if(codegenEnabled && expressions.forall(_.isThreadSafe)) {
+
GenerateMutableProjection.generate(expressions, inputSchema)
} else {
() => new InterpretedMutableProjection(expressions, inputSchema)
@@ -178,7 +179,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
protected def newPredicate(
expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
- if (codegenEnabled) {
+ if (codegenEnabled && expression.isThreadSafe) {
GeneratePredicate.generate(expression, inputSchema)
} else {
InterpretedPredicate.create(expression, inputSchema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
index 68914cf85c..3b217348b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
@@ -48,4 +48,6 @@ private[sql] case class MonotonicallyIncreasingID() extends LeafExpression {
count += 1
(TaskContext.get().partitionId().toLong << 33) + currentCount
}
+
+ override def isThreadSafe: Boolean = false
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
index 044964f3a3..412a3d4178 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
@@ -50,7 +50,8 @@ case class BroadcastLeftSemiJoinHash(
if (!rowKey.anyNull) {
val keyExists = hashSet.contains(rowKey)
if (!keyExists) {
- hashSet.add(rowKey)
+ // rowKey may be not serializable (from codegen)
+ hashSet.add(rowKey.copy())
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index bc27a9b67a..bba6f1ec96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -34,16 +34,16 @@ import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.util.ContextUtil
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.{Partition => SparkPartition, SerializableWritable, Logging, SparkException}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.RDD._
import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.RDD._
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SerializableWritable, SparkException, Partition => SparkPartition}
private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 1763cee419..3dbe6faabf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, FileOutputCommitter => MapReduceFileOutputCommitter}
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.spark._
@@ -211,6 +211,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
val partitionProj = newProjection(codegenEnabled, partitionOutput, output)
val dataProj = newProjection(codegenEnabled, dataOutput, output)
+
val dataConverter: InternalRow => Row = if (needsConversion) {
CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
} else {
@@ -244,7 +245,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
inputSchema: Seq[Attribute]): Projection = {
log.debug(
s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
- if (codegenEnabled) {
+ if (codegenEnabled && expressions.forall(_.isThreadSafe)) {
GenerateProjection.generate(expressions, inputSchema)
} else {
new InterpretedProjection(expressions, inputSchema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 27534a1f48..43d3507d7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -576,6 +576,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
// Yeah, to workaround serialization...
val dataSchema = this.dataSchema
val codegenEnabled = this.codegenEnabled
+ val needConversion = this.needConversion
val requiredOutput = requiredColumns.map { col =>
val field = dataSchema(col)
@@ -590,7 +591,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
rdd.map(_.asInstanceOf[InternalRow])
}
converted.mapPartitions { rows =>
- val buildProjection = if (codegenEnabled) {
+ val buildProjection = if (codegenEnabled && requiredOutput.forall(_.isThreadSafe)) {
GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes)
} else {
() => new InterpretedMutableProjection(requiredOutput, dataSchema.toAttributes)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index c40dd4e4b9..4986b1ea9d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -120,6 +120,8 @@ private[hive] case class HiveSimpleUdf(funcWrapper: HiveFunctionWrapper, childre
@transient
protected lazy val cached: Array[AnyRef] = new Array[AnyRef](children.length)
+ override def isThreadSafe: Boolean = false
+
// TODO: Finish input output types.
override def eval(input: InternalRow): Any = {
unwrap(
@@ -178,6 +180,8 @@ private[hive] case class HiveGenericUdf(funcWrapper: HiveFunctionWrapper, childr
lazy val dataType: DataType = inspectorToDataType(returnInspector)
+ override def isThreadSafe: Boolean = false
+
override def eval(input: InternalRow): Any = {
returnInspector // Make sure initialized.