From 06e33985c631fe91e1c4cef6039b8752548cc435 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Thu, 1 Sep 2016 17:08:33 -0700 Subject: [SPARK-16302][SQL] Set the right number of partitions for reading data from a local collection. follow #13137 This pr sets the right number of partitions when reading data from a local collection. Query 'val df = Seq((1, 2)).toDF("key", "value").count' always use defaultParallelism tasks. So it causes run many empty or small tasks. Manually tested and checked. Author: Lianhui Wang Closes #13979 from lianhuiwang/localTable-Parallel. --- .../scala/org/apache/spark/sql/execution/LocalTableScanExec.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala index 556f482f4b..6598fa381a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala @@ -42,7 +42,10 @@ case class LocalTableScanExec( } } - private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows) + private lazy val numParallelism: Int = math.min(math.max(unsafeRows.length, 1), + sqlContext.sparkContext.defaultParallelism) + + private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows, numParallelism) protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") -- cgit v1.2.3