aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLianhui Wang <lianhuiwang09@gmail.com>2016-09-01 17:08:33 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-09-01 17:09:23 -0700
commit06e33985c631fe91e1c4cef6039b8752548cc435 (patch)
treee0096325b75f54a9c8e0397382e503c27c6a79bb
parent5bea8757cce0b5e7c1f1ab9cd767c76fc63e2978 (diff)
downloadspark-06e33985c631fe91e1c4cef6039b8752548cc435.tar.gz
spark-06e33985c631fe91e1c4cef6039b8752548cc435.tar.bz2
spark-06e33985c631fe91e1c4cef6039b8752548cc435.zip
[SPARK-16302][SQL] Set the right number of partitions for reading data from a local collection.
follow #13137 This pr sets the right number of partitions when reading data from a local collection. Query 'val df = Seq((1, 2)).toDF("key", "value").count' always use defaultParallelism tasks. So it causes run many empty or small tasks. Manually tested and checked. Author: Lianhui Wang <lianhuiwang09@gmail.com> Closes #13979 from lianhuiwang/localTable-Parallel.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala5
1 files changed, 4 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
index 556f482f4b..6598fa381a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
@@ -42,7 +42,10 @@ case class LocalTableScanExec(
}
}
- private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows)
+ private lazy val numParallelism: Int = math.min(math.max(unsafeRows.length, 1),
+ sqlContext.sparkContext.defaultParallelism)
+
+ private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows, numParallelism)
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")