aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorKazuaki Ishizaki <ishizaki@jp.ibm.com>2016-04-01 22:38:07 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-01 22:38:07 -0700
commit877dc712e66db69cb320e10ba5edebca401591e3 (patch)
treec9eb4a9903d60a1c47e4f64d461bbf269e34f203 /sql
parent27e71a2cd930ae28c82c9c3ee6476a12ea165fdf (diff)
downloadspark-877dc712e66db69cb320e10ba5edebca401591e3.tar.gz
spark-877dc712e66db69cb320e10ba5edebca401591e3.tar.bz2
spark-877dc712e66db69cb320e10ba5edebca401591e3.zip
[SPARK-14138] [SQL] [MASTER] Fix generated SpecificColumnarIterator code can exceed JVM size limit for cached DataFrames
## What changes were proposed in this pull request? This PR reduces Java byte code size of method in ```SpecificColumnarIterator``` by using a approach to make a group for lot of ```ColumnAccessor``` instantiations or method calls (more than 200) into a method ## How was this patch tested? Added a new unit test, which includes large instantiations and method calls, to ```InMemoryColumnarQuerySuite``` Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #12108 from kiszk/SPARK-14138-master.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala46
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala10
2 files changed, 51 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
index d4e5db459f..e2e33e3246 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
@@ -88,7 +88,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
case array: ArrayType => classOf[ArrayColumnAccessor].getName
case t: MapType => classOf[MapColumnAccessor].getName
}
- ctx.addMutableState(accessorCls, accessorName, s"$accessorName = null;")
+ ctx.addMutableState(accessorCls, accessorName, "")
val createCode = dt match {
case t if ctx.isPrimitiveType(dt) =>
@@ -114,6 +114,42 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
(createCode, extract + patch)
}.unzip
+ /*
+ * 200 = 6000 bytes / 30 (up to 30 bytes per one call))
+ * the maximum byte code size to be compiled for HotSpot is 8000.
+ * We should keep less than 8000
+ */
+ val numberOfStatementsThreshold = 200
+ val (initializerAccessorCalls, extractorCalls) =
+ if (initializeAccessors.length <= numberOfStatementsThreshold) {
+ (initializeAccessors.mkString("\n"), extractors.mkString("\n"))
+ } else {
+ val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold)
+ val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold)
+ var groupedAccessorsLength = 0
+ groupedAccessorsItr.zipWithIndex.map { case (body, i) =>
+ groupedAccessorsLength += 1
+ val funcName = s"accessors$i"
+ val funcCode = s"""
+ |private void $funcName() {
+ | ${body.mkString("\n")}
+ |}
+ """.stripMargin
+ ctx.addNewFunction(funcName, funcCode)
+ }
+ groupedExtractorsItr.zipWithIndex.map { case (body, i) =>
+ val funcName = s"extractors$i"
+ val funcCode = s"""
+ |private void $funcName() {
+ | ${body.mkString("\n")}
+ |}
+ """.stripMargin
+ ctx.addNewFunction(funcName, funcCode)
+ }
+ ((0 to groupedAccessorsLength - 1).map { i => s"accessors$i();" }.mkString("\n"),
+ (0 to groupedAccessorsLength - 1).map { i => s"extractors$i();" }.mkString("\n"))
+ }
+
val code = s"""
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -149,8 +185,6 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
this.nativeOrder = ByteOrder.nativeOrder();
this.buffers = new byte[${columnTypes.length}][];
this.mutableRow = new MutableUnsafeRow(rowWriter);
-
- ${ctx.initMutableStates()}
}
public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) {
@@ -159,6 +193,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
this.columnIndexes = columnIndexes;
}
+ ${ctx.declareAddedFunctions()}
+
public boolean hasNext() {
if (currentRow < numRowsInBatch) {
return true;
@@ -173,7 +209,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
for (int i = 0; i < columnIndexes.length; i ++) {
buffers[i] = batch.buffers()[columnIndexes[i]];
}
- ${initializeAccessors.mkString("\n")}
+ ${initializerAccessorCalls}
return hasNext();
}
@@ -182,7 +218,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
currentRow += 1;
bufferHolder.reset();
rowWriter.zeroOutNullBytes();
- ${extractors.mkString("\n")}
+ ${extractorCalls}
unsafeRow.setTotalSize(bufferHolder.totalSize());
return unsafeRow;
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 9e04caf8ba..50c8745a28 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -220,4 +220,14 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
assert(data.count() === 10)
assert(data.filter($"s" === "3").count() === 1)
}
+
+ test("SPARK-14138: Generated SpecificColumnarIterator can exceed JVM size limit for cached DF") {
+ val length1 = 3999
+ val columnTypes1 = List.fill(length1)(IntegerType)
+ val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1)
+
+ val length2 = 10000
+ val columnTypes2 = List.fill(length2)(IntegerType)
+ val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
+ }
}