diff options
author | Dilip Biswal <dbiswal@us.ibm.com> | 2016-10-27 13:12:14 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-10-27 13:12:14 +0800 |
commit | dd4f088c1df6abd728e5544a17ba85322bedfe4c (patch) | |
tree | 65cfc1de26f39fe81e415f53831a823615256afa | |
parent | f1aeed8b022e043de2eb38b30187dcc36ee8dcdb (diff) | |
download | spark-dd4f088c1df6abd728e5544a17ba85322bedfe4c.tar.gz spark-dd4f088c1df6abd728e5544a17ba85322bedfe4c.tar.bz2 spark-dd4f088c1df6abd728e5544a17ba85322bedfe4c.zip |
[SPARK-18009][SQL] Fix ClassCastException while calling toLocalIterator() on dataframe produced by RunnableCommand
## What changes were proposed in this pull request?
A short code snippet that uses toLocalIterator() on a dataframe produced by a RunnableCommand
reproduces the problem. toLocalIterator() is called by thriftserver when
`spark.sql.thriftServer.incrementalCollect`is set to handle queries producing large result
set.
**Before**
```SQL
scala> spark.sql("show databases")
res0: org.apache.spark.sql.DataFrame = [databaseName: string]
scala> res0.toLocalIterator()
16/10/26 03:00:24 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow
```
**After**
```SQL
scala> spark.sql("drop database databases")
res30: org.apache.spark.sql.DataFrame = []
scala> spark.sql("show databases")
res31: org.apache.spark.sql.DataFrame = [databaseName: string]
scala> res31.toLocalIterator().asScala foreach println
[default]
[parquet]
```
## How was this patch tested?
Added a test in DDLSuite
Author: Dilip Biswal <dbiswal@us.ibm.com>
Closes #15642 from dilipbiswal/SPARK-18009.
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala | 2 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala | 7 |
2 files changed, 9 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 698c625d61..d82e54e575 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -66,6 +66,8 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan { override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray + override def executeToIterator: Iterator[InternalRow] = sideEffectResult.toIterator + override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray protected override def doExecute(): RDD[InternalRow] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index de326f80f6..b989d01ec7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1805,4 +1805,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } } + + test("SPARK-18009 calling toLocalIterator on commands") { + import scala.collection.JavaConverters._ + val df = sql("show databases") + val rows: Seq[Row] = df.toLocalIterator().asScala.toSeq + assert(rows.length > 0) + } } |