diff options
author | Lianhui Wang <lianhuiwang09@gmail.com> | 2016-09-01 17:08:33 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2016-09-01 17:09:23 -0700 |
commit | 06e33985c631fe91e1c4cef6039b8752548cc435 (patch) | |
tree | e0096325b75f54a9c8e0397382e503c27c6a79bb | |
parent | 5bea8757cce0b5e7c1f1ab9cd767c76fc63e2978 (diff) | |
download | spark-06e33985c631fe91e1c4cef6039b8752548cc435.tar.gz spark-06e33985c631fe91e1c4cef6039b8752548cc435.tar.bz2 spark-06e33985c631fe91e1c4cef6039b8752548cc435.zip |
[SPARK-16302][SQL] Set the right number of partitions for reading data from a local collection.
follow #13137 This pr sets the right number of partitions when reading data from a local collection.
Query 'val df = Seq((1, 2)).toDF("key", "value").count' always use defaultParallelism tasks. So it causes run many empty or small tasks.
Manually tested and checked.
Author: Lianhui Wang <lianhuiwang09@gmail.com>
Closes #13979 from lianhuiwang/localTable-Parallel.
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala | 5 |
1 files changed, 4 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala index 556f482f4b..6598fa381a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala @@ -42,7 +42,10 @@ case class LocalTableScanExec( } } - private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows) + private lazy val numParallelism: Int = math.min(math.max(unsafeRows.length, 1), + sqlContext.sparkContext.defaultParallelism) + + private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows, numParallelism) protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") |