aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-06-30 10:48:49 -0700
committerDavies Liu <davies@databricks.com>2015-06-30 10:48:49 -0700
commitfbb267ed6fe799a58f88c2fba2d41e954e5f1547 (patch)
tree19f499236591e938de23399880a9e42a55a0ad46 /sql/core
parent5fa0863626aaf5a9a41756a0b1ec82bddccbf067 (diff)
downloadspark-fbb267ed6fe799a58f88c2fba2d41e954e5f1547.tar.gz
spark-fbb267ed6fe799a58f88c2fba2d41e954e5f1547.tar.bz2
spark-fbb267ed6fe799a58f88c2fba2d41e954e5f1547.zip
[SPARK-8713] Make codegen thread safe
Codegen takes three steps: 1. Take a list of expressions, convert them into Java source code and a list of expressions that don't not support codegen (fallback to interpret mode). 2. Compile the Java source into Java class (bytecode) 3. Using the Java class and the list of expression to build a Projection. Currently, we cache the whole three steps, the key is a list of expression, result is projection. Because some of expressions (which may not thread-safe, for example, Random) will be hold by the Projection, the projection maybe not thread safe. This PR change to only cache the second step, then we can build projection using codegen even some expressions are not thread-safe, because the cache will not hold any expression anymore. cc marmbrus rxin JoshRosen Author: Davies Liu <davies@databricks.com> Closes #7101 from davies/codegen_safe and squashes the following commits: 7dd41f1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into codegen_safe 847bd08 [Davies Liu] don't use scala.refect 4ddaaed [Davies Liu] Merge branch 'master' of github.com:apache/spark into codegen_safe 1793cf1 [Davies Liu] make codegen thread safe
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala2
4 files changed, 5 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 47f56b2b7e..7739a9f949 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -156,7 +156,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = {
log.debug(
s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
- if (codegenEnabled && expressions.forall(_.isThreadSafe)) {
+ if (codegenEnabled) {
GenerateProjection.generate(expressions, inputSchema)
} else {
new InterpretedProjection(expressions, inputSchema)
@@ -168,7 +168,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
inputSchema: Seq[Attribute]): () => MutableProjection = {
log.debug(
s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
- if(codegenEnabled && expressions.forall(_.isThreadSafe)) {
+ if(codegenEnabled) {
GenerateMutableProjection.generate(expressions, inputSchema)
} else {
() => new InterpretedMutableProjection(expressions, inputSchema)
@@ -178,7 +178,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
protected def newPredicate(
expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
- if (codegenEnabled && expression.isThreadSafe) {
+ if (codegenEnabled) {
GeneratePredicate.generate(expression, inputSchema)
} else {
InterpretedPredicate.create(expression, inputSchema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
index 3b217348b7..68914cf85c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
@@ -48,6 +48,4 @@ private[sql] case class MonotonicallyIncreasingID() extends LeafExpression {
count += 1
(TaskContext.get().partitionId().toLong << 33) + currentCount
}
-
- override def isThreadSafe: Boolean = false
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 54c8eeb41a..42b51caab5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -270,7 +270,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
inputSchema: Seq[Attribute]): Projection = {
log.debug(
s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
- if (codegenEnabled && expressions.forall(_.isThreadSafe)) {
+ if (codegenEnabled) {
GenerateProjection.generate(expressions, inputSchema)
} else {
new InterpretedProjection(expressions, inputSchema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 7005c7079a..0b875304f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -591,7 +591,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
rdd.map(_.asInstanceOf[InternalRow])
}
converted.mapPartitions { rows =>
- val buildProjection = if (codegenEnabled && requiredOutput.forall(_.isThreadSafe)) {
+ val buildProjection = if (codegenEnabled) {
GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes)
} else {
() => new InterpretedMutableProjection(requiredOutput, dataSchema.toAttributes)