diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-05-24 21:23:39 -0700 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-05-24 21:23:39 -0700 |
commit | 50b660d725269dc0c11e0d350ddd7fc8b19539a0 (patch) | |
tree | 6cbf51ce284def719bd30f4f90d086e9693de2c1 /sql | |
parent | 4acababcaba567c85f3be0d5e939d99119b82d1d (diff) | |
download | spark-50b660d725269dc0c11e0d350ddd7fc8b19539a0.tar.gz spark-50b660d725269dc0c11e0d350ddd7fc8b19539a0.tar.bz2 spark-50b660d725269dc0c11e0d350ddd7fc8b19539a0.zip |
[SPARK-15498][TESTS] fix slow tests
## What changes were proposed in this pull request?
This PR fixes 3 slow tests:
1. `ParquetQuerySuite.read/write wide table`: This is not a good unit test as it runs more than 5 minutes. This PR removes it and add a new regression test in `CodeGenerationSuite`, which is more "unit".
2. `ParquetQuerySuite.returning batch for wide table`: reduce the threshold and use smaller data size.
3. `DatasetSuite.SPARK-14554: Dataset.map may generate wrong java code for wide table`: Improve `CodeFormatter.format`(introduced at https://github.com/apache/spark/pull/12979) can dramatically speed this it up.
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes #13273 from cloud-fan/test.
Diffstat (limited to 'sql')
5 files changed, 128 insertions, 79 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala index 855ae6432d..05b7c96e44 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen -import org.apache.commons.lang3.StringUtils +import java.util.regex.Matcher /** * An utility class that indents a block of code based on the curly braces and parentheses. @@ -26,13 +26,17 @@ import org.apache.commons.lang3.StringUtils * Written by Matei Zaharia. */ object CodeFormatter { + val commentHolder = """\/\*(.+?)\*\/""".r + def format(code: CodeAndComment): String = { - new CodeFormatter().addLines( - StringUtils.replaceEach( - code.body, - code.comment.keys.toArray, - code.comment.values.toArray) - ).result + val formatter = new CodeFormatter + code.body.split("\n").foreach { line => + val commentReplaced = commentHolder.replaceAllIn( + line.trim, + m => code.comment.get(m.group(1)).map(Matcher.quoteReplacement).getOrElse(m.group(0))) + formatter.addLine(commentReplaced) + } + formatter.result() } def stripExtraNewLines(input: String): String = { @@ -53,16 +57,28 @@ object CodeFormatter { def stripOverlappingComments(codeAndComment: CodeAndComment): CodeAndComment = { val code = new StringBuilder val map = codeAndComment.comment + + def getComment(line: String): Option[String] = { + if (line.startsWith("/*") && line.endsWith("*/")) { + map.get(line.substring(2, line.length - 2)) + } else { + None + } + } + var lastLine: String = "dummy" codeAndComment.body.split('\n').foreach { l => val line = l.trim() - val skip = lastLine.startsWith("/*") && lastLine.endsWith("*/") && - line.startsWith("/*") && line.endsWith("*/") && - map(lastLine).substring(3).contains(map(line).substring(3)) + + val skip = getComment(lastLine).zip(getComment(line)).exists { + case (lastComment, currentComment) => + lastComment.substring(3).contains(currentComment.substring(3)) + } + if (!skip) { - code.append(line) - code.append("\n") + code.append(line).append("\n") } + lastLine = line } new CodeAndComment(code.result().trim(), map) @@ -117,8 +133,9 @@ private class CodeFormatter { } else { indentString } - code.append(f"/* ${currentLine}%03d */ ") + code.append(f"/* ${currentLine}%03d */") if (line.trim().length > 0) { + code.append(" ") // add a space after the line number comment. code.append(thisLineIndent) if (inCommentBlock && line.startsWith("*") || line.startsWith("*/")) code.append(" ") code.append(line) @@ -129,10 +146,5 @@ private class CodeFormatter { currentLine += 1 } - private def addLines(code: String): CodeFormatter = { - code.split('\n').foreach(s => addLine(s.trim())) - this - } - private def result(): String = code.result() } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 8b74d606db..2706c38d9e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -718,27 +718,17 @@ class CodegenContext { def getPlaceHolderToComments(): collection.Map[String, String] = placeHolderToComments /** - * Register a multi-line comment and return the corresponding place holder - */ - private def registerMultilineComment(text: String): String = { - val placeHolder = s"/*${freshName("c")}*/" - val comment = text.split("(\r\n)|\r|\n").mkString("/**\n * ", "\n * ", "\n */") - placeHolderToComments += (placeHolder -> comment) - placeHolder - } - - /** * Register a comment and return the corresponding place holder */ def registerComment(text: String): String = { - if (text.contains("\n") || text.contains("\r")) { - registerMultilineComment(text) + val name = freshName("c") + val comment = if (text.contains("\n") || text.contains("\r")) { + text.split("(\r\n)|\r|\n").mkString("/**\n * ", "\n * ", "\n */") } else { - val placeHolder = s"/*${freshName("c")}*/" - val safeComment = s"// $text" - placeHolderToComments += (placeHolder -> safeComment) - placeHolder + s"// $text" } + placeHolderToComments += (name -> comment) + s"/*$name*/" } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index db34d12e28..8ffe39084c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.objects.CreateExternalRow import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -137,6 +138,19 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { } } + test("SPARK-14224: split wide external row creation into blocks due to JVM code size limit") { + val length = 5000 + val schema = StructType(Seq.fill(length)(StructField("int", IntegerType))) + val expressions = Seq(CreateExternalRow(Seq.fill(length)(Literal(1)), schema)) + val plan = GenerateMutableProjection.generate(expressions) + val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType)) + val expected = Seq(Row.fromSeq(Seq.fill(length)(1))) + + if (!checkResult(actual, expected)) { + fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected") + } + } + test("test generated safe and unsafe projection") { val schema = new StructType(Array( StructField("a", StringType, true), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala index 76afc2e8ec..bc5a8f0782 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala @@ -23,9 +23,10 @@ import org.apache.spark.sql.catalyst.util._ class CodeFormatterSuite extends SparkFunSuite { - def testCase(name: String)(input: String)(expected: String): Unit = { + def testCase(name: String)( + input: String, comment: Map[String, String] = Map.empty)(expected: String): Unit = { test(name) { - val sourceCode = new CodeAndComment(input, Map.empty) + val sourceCode = new CodeAndComment(input.trim, comment) if (CodeFormatter.format(sourceCode).trim !== expected.trim) { fail( s""" @@ -43,9 +44,9 @@ class CodeFormatterSuite extends SparkFunSuite { |/*project_c2*/ """.stripMargin, Map( - "/*project_c4*/" -> "// (((input[0, bigint, false] + 1) + 2) + 3))", - "/*project_c3*/" -> "// ((input[0, bigint, false] + 1) + 2)", - "/*project_c2*/" -> "// (input[0, bigint, false] + 1)" + "project_c4" -> "// (((input[0, bigint, false] + 1) + 2) + 3))", + "project_c3" -> "// ((input[0, bigint, false] + 1) + 2)", + "project_c2" -> "// (input[0, bigint, false] + 1)" )) val reducedCode = CodeFormatter.stripOverlappingComments(code) @@ -53,9 +54,11 @@ class CodeFormatterSuite extends SparkFunSuite { } testCase("basic example") { - """class A { + """ + |class A { |blahblah; - |}""".stripMargin + |} + """.stripMargin }{ """ |/* 001 */ class A { @@ -65,11 +68,13 @@ class CodeFormatterSuite extends SparkFunSuite { } testCase("nested example") { - """class A { + """ + |class A { | if (c) { |duh; |} - |}""".stripMargin + |} + """.stripMargin } { """ |/* 001 */ class A { @@ -81,9 +86,11 @@ class CodeFormatterSuite extends SparkFunSuite { } testCase("single line") { - """class A { + """ + |class A { | if (c) {duh;} - |}""".stripMargin + |} + """.stripMargin }{ """ |/* 001 */ class A { @@ -93,9 +100,11 @@ class CodeFormatterSuite extends SparkFunSuite { } testCase("if else on the same line") { - """class A { + """ + |class A { | if (c) {duh;} else {boo;} - |}""".stripMargin + |} + """.stripMargin }{ """ |/* 001 */ class A { @@ -105,10 +114,12 @@ class CodeFormatterSuite extends SparkFunSuite { } testCase("function calls") { - """foo( + """ + |foo( |a, |b, - |c)""".stripMargin + |c) + """.stripMargin }{ """ |/* 001 */ foo( @@ -119,10 +130,12 @@ class CodeFormatterSuite extends SparkFunSuite { } testCase("single line comments") { - """// This is a comment about class A { { { ( ( + """ + |// This is a comment about class A { { { ( ( |class A { |class body; - |}""".stripMargin + |} + """.stripMargin }{ """ |/* 001 */ // This is a comment about class A { { { ( ( @@ -133,10 +146,12 @@ class CodeFormatterSuite extends SparkFunSuite { } testCase("single line comments /* */ ") { - """/** This is a comment about class A { { { ( ( */ + """ + |/** This is a comment about class A { { { ( ( */ |class A { |class body; - |}""".stripMargin + |} + """.stripMargin }{ """ |/* 001 */ /** This is a comment about class A { { { ( ( */ @@ -147,12 +162,14 @@ class CodeFormatterSuite extends SparkFunSuite { } testCase("multi-line comments") { - """ /* This is a comment about + """ + | /* This is a comment about |class A { |class body; ...*/ |class A { |class body; - |}""".stripMargin + |} + """.stripMargin }{ """ |/* 001 */ /* This is a comment about @@ -164,30 +181,56 @@ class CodeFormatterSuite extends SparkFunSuite { """.stripMargin } - // scalastyle:off whitespace.end.of.line testCase("reduce empty lines") { CodeFormatter.stripExtraNewLines( - """class A { + """ + |class A { | | - | /*** comment1 */ + | /* + | * multi + | * line + | * comment + | */ | | class body; | | | if (c) {duh;} | else {boo;} - |}""".stripMargin) + |} + """.stripMargin.trim) }{ """ |/* 001 */ class A { - |/* 002 */ /*** comment1 */ - |/* 003 */ class body; - |/* 004 */ - |/* 005 */ if (c) {duh;} - |/* 006 */ else {boo;} - |/* 007 */ } + |/* 002 */ /* + |/* 003 */ * multi + |/* 004 */ * line + |/* 005 */ * comment + |/* 006 */ */ + |/* 007 */ class body; + |/* 008 */ + |/* 009 */ if (c) {duh;} + |/* 010 */ else {boo;} + |/* 011 */ } + """.stripMargin + } + + testCase("comment place holder")( + """ + |/*c1*/ + |class A + |/*c2*/ + |class B + |/*c1*//*c2*/ + """.stripMargin, Map("c1" -> "/*abc*/", "c2" -> "/*xyz*/") + ) { + """ + |/* 001 */ /*abc*/ + |/* 002 */ class A + |/* 003 */ /*xyz*/ + |/* 004 */ class B + |/* 005 */ /*abc*//*xyz*/ """.stripMargin } - // scalastyle:on whitespace.end.of.line } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 725e14c0fb..0a2fb0ef50 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -581,21 +581,11 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext assert(CatalystReadSupport.expandUDT(schema) === expected) } - test("read/write wide table") { - withTempPath { dir => - val path = dir.getCanonicalPath - - val df = spark.range(1000).select(Seq.tabulate(1000) {i => ('id + i).as(s"c$i")} : _*) - df.write.mode(SaveMode.Overwrite).parquet(path) - checkAnswer(spark.read.parquet(path), df) - } - } - test("returning batch for wide table") { - withSQLConf("spark.sql.codegen.maxFields" -> "100") { + withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "10") { withTempPath { dir => val path = dir.getCanonicalPath - val df = spark.range(100).select(Seq.tabulate(110) {i => ('id + i).as(s"c$i")} : _*) + val df = spark.range(10).select(Seq.tabulate(11) {i => ('id + i).as(s"c$i")} : _*) df.write.mode(SaveMode.Overwrite).parquet(path) // donot return batch, because whole stage codegen is disabled for wide table (>200 columns) @@ -605,7 +595,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext checkAnswer(df2, df) // return batch - val columns = Seq.tabulate(90) {i => s"c$i"} + val columns = Seq.tabulate(9) {i => s"c$i"} val df3 = df2.selectExpr(columns : _*) assert( df3.queryExecution.sparkPlan.find(_.isInstanceOf[BatchedDataSourceScanExec]).isDefined, |