diff options
author | Davies Liu <davies@databricks.com> | 2015-10-20 14:01:53 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-10-20 14:01:53 -0700 |
commit | 06e6b765d0c747b773d7f3be28ddb0543c955a1f (patch) | |
tree | 13ba86c25a5471f429f0dcf2d7e37ace474a0233 /sql/catalyst | |
parent | 67d468f8d9172569ec9846edc6432240547696dd (diff) | |
download | spark-06e6b765d0c747b773d7f3be28ddb0543c955a1f.tar.gz spark-06e6b765d0c747b773d7f3be28ddb0543c955a1f.tar.bz2 spark-06e6b765d0c747b773d7f3be28ddb0543c955a1f.zip |
[SPARK-11149] [SQL] Improve cache performance for primitive types
This PR improve the performance by:
1) Generate an Iterator that take Iterator[CachedBatch] as input, and call accessors (unroll the loop for columns), avoid the expensive Iterator.flatMap.
2) Use Unsafe.getInt/getLong/getFloat/getDouble instead of ByteBuffer.getInt/getLong/getFloat/getDouble, the later one actually read byte by byte.
3) Remove the unnecessary copy() in Coalesce(), which is not related to memory cache, found during benchmark.
The following benchmark showed that we can speedup the columnar cache of int by 2x.
```
path = '/opt/tpcds/store_sales/'
int_cols = ['ss_sold_date_sk', 'ss_sold_time_sk', 'ss_item_sk','ss_customer_sk']
df = sqlContext.read.parquet(path).select(int_cols).cache()
df.count()
t = time.time()
print df.select("*")._jdf.queryExecution().toRdd().count()
print time.time() - t
```
Author: Davies Liu <davies@databricks.com>
Closes #9145 from davies/byte_buffer.
Diffstat (limited to 'sql/catalyst')
3 files changed, 35 insertions, 44 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala index c98182c96b..9b8b6382d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala @@ -32,6 +32,7 @@ private class CodeFormatter { private var indentLevel = 0 private val indentSize = 2 private var indentString = "" + private var currentLine = 1 private def addLine(line: String): Unit = { val indentChange = @@ -44,11 +45,13 @@ private class CodeFormatter { } else { indentString } + code.append(f"/* ${currentLine}%03d */ ") code.append(thisLineIndent) code.append(line) code.append("\n") indentLevel = newIndentLevel indentString = " " * (indentSize * newIndentLevel) + currentLine += 1 } private def addLines(code: String): CodeFormatter = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 7544d27e3d..a4ec5085fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -391,26 +391,24 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin classOf[ArrayData].getName, classOf[UnsafeArrayData].getName, classOf[MapData].getName, - classOf[UnsafeMapData].getName + classOf[UnsafeMapData].getName, + classOf[MutableRow].getName )) evaluator.setExtendedClass(classOf[GeneratedClass]) def formatted = CodeFormatter.format(code) - def withLineNums = formatted.split("\n").zipWithIndex.map { - case (l, n) => f"${n + 1}%03d $l" - }.mkString("\n") logDebug({ // Only add extra debugging info to byte code when we are going to print the source code. evaluator.setDebuggingInformation(true, true, false) - withLineNums + formatted }) try { evaluator.cook("generated.java", code) } catch { case e: Exception => - val msg = s"failed to compile: $e\n$withLineNums" + val msg = s"failed to compile: $e\n$formatted" logError(msg, e) throw new Exception(msg, e) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala index 46daa3eb8b..9da1068e9c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala @@ -29,78 +29,68 @@ class CodeFormatterSuite extends SparkFunSuite { } testCase("basic example") { - """ - |class A { + """class A { |blahblah; - |} - """.stripMargin + |}""".stripMargin }{ """ - |class A { - | blahblah; - |} + |/* 001 */ class A { + |/* 002 */ blahblah; + |/* 003 */ } """.stripMargin } testCase("nested example") { - """ - |class A { + """class A { | if (c) { |duh; |} - |} - """.stripMargin + |}""".stripMargin } { """ - |class A { - | if (c) { - | duh; - | } - |} + |/* 001 */ class A { + |/* 002 */ if (c) { + |/* 003 */ duh; + |/* 004 */ } + |/* 005 */ } """.stripMargin } testCase("single line") { - """ - |class A { + """class A { | if (c) {duh;} - |} - """.stripMargin + |}""".stripMargin }{ """ - |class A { - | if (c) {duh;} - |} + |/* 001 */ class A { + |/* 002 */ if (c) {duh;} + |/* 003 */ } """.stripMargin } testCase("if else on the same line") { - """ - |class A { + """class A { | if (c) {duh;} else {boo;} - |} - """.stripMargin + |}""".stripMargin }{ """ - |class A { - | if (c) {duh;} else {boo;} - |} + |/* 001 */ class A { + |/* 002 */ if (c) {duh;} else {boo;} + |/* 003 */ } """.stripMargin } testCase("function calls") { - """ - |foo( + """foo( |a, |b, - |c) - """.stripMargin + |c)""".stripMargin }{ """ - |foo( - | a, - | b, - | c) + |/* 001 */ foo( + |/* 002 */ a, + |/* 003 */ b, + |/* 004 */ c) """.stripMargin } } |