diff options
author | Wesley Tang <tangmingjun@mininglamp.com> | 2016-07-29 04:26:05 -0700 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-07-29 04:26:05 -0700 |
commit | d1d5069aa3744d46abd3889abab5f15e9067382a (patch) | |
tree | d5275f95e564fcf8da7a0fc4ac642152b7016e35 | |
parent | 274f3b9ec86e4109c7678eef60f990d41dc3899f (diff) | |
download | spark-d1d5069aa3744d46abd3889abab5f15e9067382a.tar.gz spark-d1d5069aa3744d46abd3889abab5f15e9067382a.tar.bz2 spark-d1d5069aa3744d46abd3889abab5f15e9067382a.zip |
[SPARK-16664][SQL] Fix persist call on Data frames with more than 200…
## What changes were proposed in this pull request?
f12f11e578169b47e3f8b18b299948c0670ba585 introduced this bug, missed foreach as map
## How was this patch tested?
Test added
Author: Wesley Tang <tangmingjun@mininglamp.com>
Closes #14324 from breakdawn/master.
3 files changed, 12 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index 7a14879b8b..96bd338f09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -127,7 +127,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold) val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold) var groupedAccessorsLength = 0 - groupedAccessorsItr.zipWithIndex.map { case (body, i) => + groupedAccessorsItr.zipWithIndex.foreach { case (body, i) => groupedAccessorsLength += 1 val funcName = s"accessors$i" val funcCode = s""" @@ -137,7 +137,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera """.stripMargin ctx.addNewFunction(funcName, funcCode) } - groupedExtractorsItr.zipWithIndex.map { case (body, i) => + groupedExtractorsItr.zipWithIndex.foreach { case (body, i) => val funcName = s"extractors$i" val funcCode = s""" |private void $funcName() { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 905da554f1..62cfd24041 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1571,4 +1571,12 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(joined, Row("x", null, null)) checkAnswer(joined.filter($"new".isNull), Row("x", null, null)) } + + test("SPARK-16664: persist with more than 200 columns") { + val size = 201L + val rdd = sparkContext.makeRDD(Seq(Row.fromSeq(Seq.range(0, size)))) + val schemas = List.range(0, size).map(a => StructField("name" + a, LongType, true)) + val df = spark.createDataFrame(rdd, StructType(schemas), false) + assert(df.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index af3ed14c12..937839644a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -227,7 +227,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { val columnTypes1 = List.fill(length1)(IntegerType) val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1) - val length2 = 10000 + // SPARK-16664: the limit of janino is 8117 + val length2 = 8117 val columnTypes2 = List.fill(length2)(IntegerType) val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2) } |