aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTakuya UESHIN <ueshin@happy-camper.st>2016-10-03 21:48:58 -0700
committerReynold Xin <rxin@databricks.com>2016-10-03 21:48:58 -0700
commitb1b47274bfeba17a9e4e9acebd7385289f31f6c8 (patch)
treeefdc4ab1c4ff848253d55188a16d427e9a0e44b4
parentc571cfb2d0e1e224107fc3f0c672730cae9804cb (diff)
downloadspark-b1b47274bfeba17a9e4e9acebd7385289f31f6c8.tar.gz
spark-b1b47274bfeba17a9e4e9acebd7385289f31f6c8.tar.bz2
spark-b1b47274bfeba17a9e4e9acebd7385289f31f6c8.zip
[SPARK-17702][SQL] Code generation including too many mutable states exceeds JVM size limit.
## What changes were proposed in this pull request? Code generation including too many mutable states exceeds JVM size limit to extract values from `references` into fields in the constructor. We should split the generated extractions in the constructor into smaller functions. ## How was this patch tested? I added some tests to check if the generated codes for the expressions exceed or not. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #15275 from ueshin/issues/SPARK-17702.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala18
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala4
8 files changed, 48 insertions, 12 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index cb808e375a..574943d3d2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -178,7 +178,10 @@ class CodegenContext {
def initMutableStates(): String = {
// It's possible that we add same mutable state twice, e.g. the `mergeExpressions` in
// `TypedAggregateExpression`, we should call `distinct` here to remove the duplicated ones.
- mutableStates.distinct.map(_._3).mkString("\n")
+ val initCodes = mutableStates.distinct.map(_._3 + "\n")
+ // The generated initialization code may exceed 64kb function size limit in JVM if there are too
+ // many mutable states, so split it into multiple functions.
+ splitExpressions(initCodes, "init", Nil)
}
/**
@@ -604,6 +607,11 @@ class CodegenContext {
// Cannot split these expressions because they are not created from a row object.
return expressions.mkString("\n")
}
+ splitExpressions(expressions, "apply", ("InternalRow", row) :: Nil)
+ }
+
+ private def splitExpressions(
+ expressions: Seq[String], funcName: String, arguments: Seq[(String, String)]): String = {
val blocks = new ArrayBuffer[String]()
val blockBuilder = new StringBuilder()
for (code <- expressions) {
@@ -623,11 +631,11 @@ class CodegenContext {
// inline execution if only one block
blocks.head
} else {
- val apply = freshName("apply")
+ val func = freshName(funcName)
val functions = blocks.zipWithIndex.map { case (body, i) =>
- val name = s"${apply}_$i"
+ val name = s"${func}_$i"
val code = s"""
- |private void $name(InternalRow $row) {
+ |private void $name(${arguments.map { case (t, name) => s"$t $name" }.mkString(", ")}) {
| $body
|}
""".stripMargin
@@ -635,7 +643,7 @@ class CodegenContext {
name
}
- functions.map(name => s"$name($row);").mkString("\n")
+ functions.map(name => s"$name(${arguments.map(_._2).mkString(", ")});").mkString("\n")
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
index 0f82d2e613..13d61af1c9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
@@ -104,7 +104,6 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
private Object[] references;
private MutableRow mutableRow;
${ctx.declareMutableStates()}
- ${ctx.declareAddedFunctions()}
public SpecificMutableProjection(Object[] references) {
this.references = references;
@@ -112,6 +111,8 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
${ctx.initMutableStates()}
}
+ ${ctx.declareAddedFunctions()}
+
public ${classOf[BaseMutableProjection].getName} target(MutableRow row) {
mutableRow = row;
return this;
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index f1c30ef6c7..1cef95654a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -133,13 +133,14 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
private Object[] references;
${ctx.declareMutableStates()}
- ${ctx.declareAddedFunctions()}
public SpecificOrdering(Object[] references) {
this.references = references;
${ctx.initMutableStates()}
}
+ ${ctx.declareAddedFunctions()}
+
public int compare(InternalRow a, InternalRow b) {
InternalRow ${ctx.INPUT_ROW} = null; // Holds current row being evaluated.
$comparisons
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
index 106bb27964..39aa7b17de 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
@@ -40,6 +40,7 @@ object GeneratePredicate extends CodeGenerator[Expression, (InternalRow) => Bool
protected def create(predicate: Expression): ((InternalRow) => Boolean) = {
val ctx = newCodeGenContext()
val eval = predicate.genCode(ctx)
+
val codeBody = s"""
public SpecificPredicate generate(Object[] references) {
return new SpecificPredicate(references);
@@ -48,13 +49,14 @@ object GeneratePredicate extends CodeGenerator[Expression, (InternalRow) => Bool
class SpecificPredicate extends ${classOf[Predicate].getName} {
private final Object[] references;
${ctx.declareMutableStates()}
- ${ctx.declareAddedFunctions()}
public SpecificPredicate(Object[] references) {
this.references = references;
${ctx.initMutableStates()}
}
+ ${ctx.declareAddedFunctions()}
+
public boolean eval(InternalRow ${ctx.INPUT_ROW}) {
${eval.code}
return !${eval.isNull} && ${eval.value};
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
index b891f94673..1c98c9ed10 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
@@ -155,6 +155,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
"""
}
val allExpressions = ctx.splitExpressions(ctx.INPUT_ROW, expressionCodes)
+
val codeBody = s"""
public java.lang.Object generate(Object[] references) {
return new SpecificSafeProjection(references);
@@ -165,7 +166,6 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
private Object[] references;
private MutableRow mutableRow;
${ctx.declareMutableStates()}
- ${ctx.declareAddedFunctions()}
public SpecificSafeProjection(Object[] references) {
this.references = references;
@@ -173,6 +173,8 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
${ctx.initMutableStates()}
}
+ ${ctx.declareAddedFunctions()}
+
public java.lang.Object apply(java.lang.Object _i) {
InternalRow ${ctx.INPUT_ROW} = (InternalRow) _i;
$allExpressions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
index 75bb6936b4..7cc45372da 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
@@ -374,13 +374,14 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
private Object[] references;
${ctx.declareMutableStates()}
- ${ctx.declareAddedFunctions()}
public SpecificUnsafeProjection(Object[] references) {
this.references = references;
${ctx.initMutableStates()}
}
+ ${ctx.declareAddedFunctions()}
+
// Scala.Function1 need this
public java.lang.Object apply(java.lang.Object row) {
return apply((InternalRow) row);
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index 45dcfcaf23..5588b44291 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.expressions
+import java.sql.Timestamp
+
import org.apache.spark.SparkFunSuite
import org.apache.spark.metrics.source.CodegenMetrics
import org.apache.spark.sql.Row
@@ -24,7 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.objects.{CreateExternalRow, GetExternalRowField, ValidateExternalType}
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.ThreadUtils
@@ -164,6 +166,23 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}
+ test("SPARK-17702: split wide constructor into blocks due to JVM code size limit") {
+ val length = 5000
+ val expressions = Seq.fill(length) {
+ ToUTCTimestamp(
+ Literal.create(Timestamp.valueOf("2015-07-24 00:00:00"), TimestampType),
+ Literal.create("PST", StringType))
+ }
+ val plan = GenerateMutableProjection.generate(expressions)
+ val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+ val expected = Seq.fill(length)(
+ DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 07:00:00")))
+
+ if (!checkResult(actual, expected)) {
+ fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected")
+ }
+ }
+
test("test generated safe and unsafe projection") {
val schema = new StructType(Array(
StructField("a", StringType, true),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index fb57ed7692..62bf6f4a81 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -316,14 +316,16 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
private Object[] references;
+ private scala.collection.Iterator[] inputs;
${ctx.declareMutableStates()}
public GeneratedIterator(Object[] references) {
this.references = references;
}
- public void init(int index, scala.collection.Iterator inputs[]) {
+ public void init(int index, scala.collection.Iterator[] inputs) {
partitionIndex = index;
+ this.inputs = inputs;
${ctx.initMutableStates()}
}