diff options
author | Yin Huai <yhuai@databricks.com> | 2015-05-15 12:04:26 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2015-05-15 12:04:26 +0800 |
commit | e8f0e016eaf80a363796dd0a094291dcb3b35793 (patch) | |
tree | 59e8659a55ba07e678f60ea41dd27679139a92c1 /sql | |
parent | f9705d461350c6fccf8022e933ea909f40c53576 (diff) | |
download | spark-e8f0e016eaf80a363796dd0a094291dcb3b35793.tar.gz spark-e8f0e016eaf80a363796dd0a094291dcb3b35793.tar.bz2 spark-e8f0e016eaf80a363796dd0a094291dcb3b35793.zip |
[SQL] When creating partitioned table scan, explicitly create UnionRDD.
Otherwise, it will cause stack overflow when there are many partitions.
Author: Yin Huai <yhuai@databricks.com>
Closes #6162 from yhuai/partitionUnionedRDD and squashes the following commits:
fa016d8 [Yin Huai] Explicitly create UnionRDD.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala | 11 |
1 files changed, 7 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index a5410cda0f..ee099ab959 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.Logging import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{UnionRDD, RDD} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ @@ -169,9 +169,12 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { scan.execute() } - val unionedRows = perPartitionRows.reduceOption(_ ++ _).getOrElse { - relation.sqlContext.emptyResult - } + val unionedRows = + if (perPartitionRows.length == 0) { + relation.sqlContext.emptyResult + } else { + new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows) + } createPhysicalRDD(logicalRelation.relation, output, unionedRows) } |