diff options
author | Kazuaki Ishizaki <ishizaki@jp.ibm.com> | 2016-04-01 22:38:07 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-04-01 22:38:07 -0700 |
commit | 877dc712e66db69cb320e10ba5edebca401591e3 (patch) | |
tree | c9eb4a9903d60a1c47e4f64d461bbf269e34f203 | |
parent | 27e71a2cd930ae28c82c9c3ee6476a12ea165fdf (diff) | |
download | spark-877dc712e66db69cb320e10ba5edebca401591e3.tar.gz spark-877dc712e66db69cb320e10ba5edebca401591e3.tar.bz2 spark-877dc712e66db69cb320e10ba5edebca401591e3.zip |
[SPARK-14138] [SQL] [MASTER] Fix generated SpecificColumnarIterator code can exceed JVM size limit for cached DataFrames
## What changes were proposed in this pull request?
This PR reduces Java byte code size of method in ```SpecificColumnarIterator``` by using a approach to make a group for lot of ```ColumnAccessor``` instantiations or method calls (more than 200) into a method
## How was this patch tested?
Added a new unit test, which includes large instantiations and method calls, to ```InMemoryColumnarQuerySuite```
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes #12108 from kiszk/SPARK-14138-master.
2 files changed, 51 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index d4e5db459f..e2e33e3246 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -88,7 +88,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera case array: ArrayType => classOf[ArrayColumnAccessor].getName case t: MapType => classOf[MapColumnAccessor].getName } - ctx.addMutableState(accessorCls, accessorName, s"$accessorName = null;") + ctx.addMutableState(accessorCls, accessorName, "") val createCode = dt match { case t if ctx.isPrimitiveType(dt) => @@ -114,6 +114,42 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera (createCode, extract + patch) }.unzip + /* + * 200 = 6000 bytes / 30 (up to 30 bytes per one call)) + * the maximum byte code size to be compiled for HotSpot is 8000. + * We should keep less than 8000 + */ + val numberOfStatementsThreshold = 200 + val (initializerAccessorCalls, extractorCalls) = + if (initializeAccessors.length <= numberOfStatementsThreshold) { + (initializeAccessors.mkString("\n"), extractors.mkString("\n")) + } else { + val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold) + val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold) + var groupedAccessorsLength = 0 + groupedAccessorsItr.zipWithIndex.map { case (body, i) => + groupedAccessorsLength += 1 + val funcName = s"accessors$i" + val funcCode = s""" + |private void $funcName() { + | ${body.mkString("\n")} + |} + """.stripMargin + ctx.addNewFunction(funcName, funcCode) + } + groupedExtractorsItr.zipWithIndex.map { case (body, i) => + val funcName = s"extractors$i" + val funcCode = s""" + |private void $funcName() { + | ${body.mkString("\n")} + |} + """.stripMargin + ctx.addNewFunction(funcName, funcCode) + } + ((0 to groupedAccessorsLength - 1).map { i => s"accessors$i();" }.mkString("\n"), + (0 to groupedAccessorsLength - 1).map { i => s"extractors$i();" }.mkString("\n")) + } + val code = s""" import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -149,8 +185,6 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera this.nativeOrder = ByteOrder.nativeOrder(); this.buffers = new byte[${columnTypes.length}][]; this.mutableRow = new MutableUnsafeRow(rowWriter); - - ${ctx.initMutableStates()} } public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) { @@ -159,6 +193,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera this.columnIndexes = columnIndexes; } + ${ctx.declareAddedFunctions()} + public boolean hasNext() { if (currentRow < numRowsInBatch) { return true; @@ -173,7 +209,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera for (int i = 0; i < columnIndexes.length; i ++) { buffers[i] = batch.buffers()[columnIndexes[i]]; } - ${initializeAccessors.mkString("\n")} + ${initializerAccessorCalls} return hasNext(); } @@ -182,7 +218,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera currentRow += 1; bufferHolder.reset(); rowWriter.zeroOutNullBytes(); - ${extractors.mkString("\n")} + ${extractorCalls} unsafeRow.setTotalSize(bufferHolder.totalSize()); return unsafeRow; } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 9e04caf8ba..50c8745a28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -220,4 +220,14 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { assert(data.count() === 10) assert(data.filter($"s" === "3").count() === 1) } + + test("SPARK-14138: Generated SpecificColumnarIterator can exceed JVM size limit for cached DF") { + val length1 = 3999 + val columnTypes1 = List.fill(length1)(IntegerType) + val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1) + + val length2 = 10000 + val columnTypes2 = List.fill(length2)(IntegerType) + val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2) + } } |