aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-05-24 21:23:39 -0700
committerCheng Lian <lian@databricks.com>2016-05-24 21:23:39 -0700
commit50b660d725269dc0c11e0d350ddd7fc8b19539a0 (patch)
tree6cbf51ce284def719bd30f4f90d086e9693de2c1 /sql
parent4acababcaba567c85f3be0d5e939d99119b82d1d (diff)
downloadspark-50b660d725269dc0c11e0d350ddd7fc8b19539a0.tar.gz
spark-50b660d725269dc0c11e0d350ddd7fc8b19539a0.tar.bz2
spark-50b660d725269dc0c11e0d350ddd7fc8b19539a0.zip
[SPARK-15498][TESTS] fix slow tests
## What changes were proposed in this pull request? This PR fixes 3 slow tests: 1. `ParquetQuerySuite.read/write wide table`: This is not a good unit test as it runs more than 5 minutes. This PR removes it and add a new regression test in `CodeGenerationSuite`, which is more "unit". 2. `ParquetQuerySuite.returning batch for wide table`: reduce the threshold and use smaller data size. 3. `DatasetSuite.SPARK-14554: Dataset.map may generate wrong java code for wide table`: Improve `CodeFormatter.format`(introduced at https://github.com/apache/spark/pull/12979) can dramatically speed this it up. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #13273 from cloud-fan/test.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala48
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala22
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala14
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala107
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala16
5 files changed, 128 insertions, 79 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala
index 855ae6432d..05b7c96e44 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.expressions.codegen
-import org.apache.commons.lang3.StringUtils
+import java.util.regex.Matcher
/**
* An utility class that indents a block of code based on the curly braces and parentheses.
@@ -26,13 +26,17 @@ import org.apache.commons.lang3.StringUtils
* Written by Matei Zaharia.
*/
object CodeFormatter {
+ val commentHolder = """\/\*(.+?)\*\/""".r
+
def format(code: CodeAndComment): String = {
- new CodeFormatter().addLines(
- StringUtils.replaceEach(
- code.body,
- code.comment.keys.toArray,
- code.comment.values.toArray)
- ).result
+ val formatter = new CodeFormatter
+ code.body.split("\n").foreach { line =>
+ val commentReplaced = commentHolder.replaceAllIn(
+ line.trim,
+ m => code.comment.get(m.group(1)).map(Matcher.quoteReplacement).getOrElse(m.group(0)))
+ formatter.addLine(commentReplaced)
+ }
+ formatter.result()
}
def stripExtraNewLines(input: String): String = {
@@ -53,16 +57,28 @@ object CodeFormatter {
def stripOverlappingComments(codeAndComment: CodeAndComment): CodeAndComment = {
val code = new StringBuilder
val map = codeAndComment.comment
+
+ def getComment(line: String): Option[String] = {
+ if (line.startsWith("/*") && line.endsWith("*/")) {
+ map.get(line.substring(2, line.length - 2))
+ } else {
+ None
+ }
+ }
+
var lastLine: String = "dummy"
codeAndComment.body.split('\n').foreach { l =>
val line = l.trim()
- val skip = lastLine.startsWith("/*") && lastLine.endsWith("*/") &&
- line.startsWith("/*") && line.endsWith("*/") &&
- map(lastLine).substring(3).contains(map(line).substring(3))
+
+ val skip = getComment(lastLine).zip(getComment(line)).exists {
+ case (lastComment, currentComment) =>
+ lastComment.substring(3).contains(currentComment.substring(3))
+ }
+
if (!skip) {
- code.append(line)
- code.append("\n")
+ code.append(line).append("\n")
}
+
lastLine = line
}
new CodeAndComment(code.result().trim(), map)
@@ -117,8 +133,9 @@ private class CodeFormatter {
} else {
indentString
}
- code.append(f"/* ${currentLine}%03d */ ")
+ code.append(f"/* ${currentLine}%03d */")
if (line.trim().length > 0) {
+ code.append(" ") // add a space after the line number comment.
code.append(thisLineIndent)
if (inCommentBlock && line.startsWith("*") || line.startsWith("*/")) code.append(" ")
code.append(line)
@@ -129,10 +146,5 @@ private class CodeFormatter {
currentLine += 1
}
- private def addLines(code: String): CodeFormatter = {
- code.split('\n').foreach(s => addLine(s.trim()))
- this
- }
-
private def result(): String = code.result()
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 8b74d606db..2706c38d9e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -718,27 +718,17 @@ class CodegenContext {
def getPlaceHolderToComments(): collection.Map[String, String] = placeHolderToComments
/**
- * Register a multi-line comment and return the corresponding place holder
- */
- private def registerMultilineComment(text: String): String = {
- val placeHolder = s"/*${freshName("c")}*/"
- val comment = text.split("(\r\n)|\r|\n").mkString("/**\n * ", "\n * ", "\n */")
- placeHolderToComments += (placeHolder -> comment)
- placeHolder
- }
-
- /**
* Register a comment and return the corresponding place holder
*/
def registerComment(text: String): String = {
- if (text.contains("\n") || text.contains("\r")) {
- registerMultilineComment(text)
+ val name = freshName("c")
+ val comment = if (text.contains("\n") || text.contains("\r")) {
+ text.split("(\r\n)|\r|\n").mkString("/**\n * ", "\n * ", "\n */")
} else {
- val placeHolder = s"/*${freshName("c")}*/"
- val safeComment = s"// $text"
- placeHolderToComments += (placeHolder -> safeComment)
- placeHolder
+ s"// $text"
}
+ placeHolderToComments += (name -> comment)
+ s"/*$name*/"
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index db34d12e28..8ffe39084c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.objects.CreateExternalRow
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -137,6 +138,19 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}
+ test("SPARK-14224: split wide external row creation into blocks due to JVM code size limit") {
+ val length = 5000
+ val schema = StructType(Seq.fill(length)(StructField("int", IntegerType)))
+ val expressions = Seq(CreateExternalRow(Seq.fill(length)(Literal(1)), schema))
+ val plan = GenerateMutableProjection.generate(expressions)
+ val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+ val expected = Seq(Row.fromSeq(Seq.fill(length)(1)))
+
+ if (!checkResult(actual, expected)) {
+ fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected")
+ }
+ }
+
test("test generated safe and unsafe projection") {
val schema = new StructType(Array(
StructField("a", StringType, true),
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala
index 76afc2e8ec..bc5a8f0782 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala
@@ -23,9 +23,10 @@ import org.apache.spark.sql.catalyst.util._
class CodeFormatterSuite extends SparkFunSuite {
- def testCase(name: String)(input: String)(expected: String): Unit = {
+ def testCase(name: String)(
+ input: String, comment: Map[String, String] = Map.empty)(expected: String): Unit = {
test(name) {
- val sourceCode = new CodeAndComment(input, Map.empty)
+ val sourceCode = new CodeAndComment(input.trim, comment)
if (CodeFormatter.format(sourceCode).trim !== expected.trim) {
fail(
s"""
@@ -43,9 +44,9 @@ class CodeFormatterSuite extends SparkFunSuite {
|/*project_c2*/
""".stripMargin,
Map(
- "/*project_c4*/" -> "// (((input[0, bigint, false] + 1) + 2) + 3))",
- "/*project_c3*/" -> "// ((input[0, bigint, false] + 1) + 2)",
- "/*project_c2*/" -> "// (input[0, bigint, false] + 1)"
+ "project_c4" -> "// (((input[0, bigint, false] + 1) + 2) + 3))",
+ "project_c3" -> "// ((input[0, bigint, false] + 1) + 2)",
+ "project_c2" -> "// (input[0, bigint, false] + 1)"
))
val reducedCode = CodeFormatter.stripOverlappingComments(code)
@@ -53,9 +54,11 @@ class CodeFormatterSuite extends SparkFunSuite {
}
testCase("basic example") {
- """class A {
+ """
+ |class A {
|blahblah;
- |}""".stripMargin
+ |}
+ """.stripMargin
}{
"""
|/* 001 */ class A {
@@ -65,11 +68,13 @@ class CodeFormatterSuite extends SparkFunSuite {
}
testCase("nested example") {
- """class A {
+ """
+ |class A {
| if (c) {
|duh;
|}
- |}""".stripMargin
+ |}
+ """.stripMargin
} {
"""
|/* 001 */ class A {
@@ -81,9 +86,11 @@ class CodeFormatterSuite extends SparkFunSuite {
}
testCase("single line") {
- """class A {
+ """
+ |class A {
| if (c) {duh;}
- |}""".stripMargin
+ |}
+ """.stripMargin
}{
"""
|/* 001 */ class A {
@@ -93,9 +100,11 @@ class CodeFormatterSuite extends SparkFunSuite {
}
testCase("if else on the same line") {
- """class A {
+ """
+ |class A {
| if (c) {duh;} else {boo;}
- |}""".stripMargin
+ |}
+ """.stripMargin
}{
"""
|/* 001 */ class A {
@@ -105,10 +114,12 @@ class CodeFormatterSuite extends SparkFunSuite {
}
testCase("function calls") {
- """foo(
+ """
+ |foo(
|a,
|b,
- |c)""".stripMargin
+ |c)
+ """.stripMargin
}{
"""
|/* 001 */ foo(
@@ -119,10 +130,12 @@ class CodeFormatterSuite extends SparkFunSuite {
}
testCase("single line comments") {
- """// This is a comment about class A { { { ( (
+ """
+ |// This is a comment about class A { { { ( (
|class A {
|class body;
- |}""".stripMargin
+ |}
+ """.stripMargin
}{
"""
|/* 001 */ // This is a comment about class A { { { ( (
@@ -133,10 +146,12 @@ class CodeFormatterSuite extends SparkFunSuite {
}
testCase("single line comments /* */ ") {
- """/** This is a comment about class A { { { ( ( */
+ """
+ |/** This is a comment about class A { { { ( ( */
|class A {
|class body;
- |}""".stripMargin
+ |}
+ """.stripMargin
}{
"""
|/* 001 */ /** This is a comment about class A { { { ( ( */
@@ -147,12 +162,14 @@ class CodeFormatterSuite extends SparkFunSuite {
}
testCase("multi-line comments") {
- """ /* This is a comment about
+ """
+ | /* This is a comment about
|class A {
|class body; ...*/
|class A {
|class body;
- |}""".stripMargin
+ |}
+ """.stripMargin
}{
"""
|/* 001 */ /* This is a comment about
@@ -164,30 +181,56 @@ class CodeFormatterSuite extends SparkFunSuite {
""".stripMargin
}
- // scalastyle:off whitespace.end.of.line
testCase("reduce empty lines") {
CodeFormatter.stripExtraNewLines(
- """class A {
+ """
+ |class A {
|
|
- | /*** comment1 */
+ | /*
+ | * multi
+ | * line
+ | * comment
+ | */
|
| class body;
|
|
| if (c) {duh;}
| else {boo;}
- |}""".stripMargin)
+ |}
+ """.stripMargin.trim)
}{
"""
|/* 001 */ class A {
- |/* 002 */ /*** comment1 */
- |/* 003 */ class body;
- |/* 004 */
- |/* 005 */ if (c) {duh;}
- |/* 006 */ else {boo;}
- |/* 007 */ }
+ |/* 002 */ /*
+ |/* 003 */ * multi
+ |/* 004 */ * line
+ |/* 005 */ * comment
+ |/* 006 */ */
+ |/* 007 */ class body;
+ |/* 008 */
+ |/* 009 */ if (c) {duh;}
+ |/* 010 */ else {boo;}
+ |/* 011 */ }
+ """.stripMargin
+ }
+
+ testCase("comment place holder")(
+ """
+ |/*c1*/
+ |class A
+ |/*c2*/
+ |class B
+ |/*c1*//*c2*/
+ """.stripMargin, Map("c1" -> "/*abc*/", "c2" -> "/*xyz*/")
+ ) {
+ """
+ |/* 001 */ /*abc*/
+ |/* 002 */ class A
+ |/* 003 */ /*xyz*/
+ |/* 004 */ class B
+ |/* 005 */ /*abc*//*xyz*/
""".stripMargin
}
- // scalastyle:on whitespace.end.of.line
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 725e14c0fb..0a2fb0ef50 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -581,21 +581,11 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
assert(CatalystReadSupport.expandUDT(schema) === expected)
}
- test("read/write wide table") {
- withTempPath { dir =>
- val path = dir.getCanonicalPath
-
- val df = spark.range(1000).select(Seq.tabulate(1000) {i => ('id + i).as(s"c$i")} : _*)
- df.write.mode(SaveMode.Overwrite).parquet(path)
- checkAnswer(spark.read.parquet(path), df)
- }
- }
-
test("returning batch for wide table") {
- withSQLConf("spark.sql.codegen.maxFields" -> "100") {
+ withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "10") {
withTempPath { dir =>
val path = dir.getCanonicalPath
- val df = spark.range(100).select(Seq.tabulate(110) {i => ('id + i).as(s"c$i")} : _*)
+ val df = spark.range(10).select(Seq.tabulate(11) {i => ('id + i).as(s"c$i")} : _*)
df.write.mode(SaveMode.Overwrite).parquet(path)
// donot return batch, because whole stage codegen is disabled for wide table (>200 columns)
@@ -605,7 +595,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
checkAnswer(df2, df)
// return batch
- val columns = Seq.tabulate(90) {i => s"c$i"}
+ val columns = Seq.tabulate(9) {i => s"c$i"}
val df3 = df2.selectExpr(columns : _*)
assert(
df3.queryExecution.sparkPlan.find(_.isInstanceOf[BatchedDataSourceScanExec]).isDefined,