aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala3
2 files changed, 9 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index ee48a7b81d..c1a97de72f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -134,8 +134,13 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
}
case _ =>
- val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
+ val defaultMaxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
val openCostInBytes = files.sqlContext.conf.filesOpenCostInBytes
+ val defaultParallelism = files.sqlContext.sparkContext.defaultParallelism
+ val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
+ val bytesPerCore = totalBytes / defaultParallelism
+ val maxSplitBytes = Math.min(defaultMaxSplitBytes,
+ Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 4699c48c72..50cd03a40c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.hadoop.fs.{BlockLocation, FileStatus, RawLocalFileSystem}
import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper}
@@ -38,6 +39,8 @@ import org.apache.spark.util.Utils
class FileSourceStrategySuite extends QueryTest with SharedSQLContext with PredicateHelper {
import testImplicits._
+ protected override val sparkConf = new SparkConf().set("spark.default.parallelism", "1")
+
test("unpartitioned table, single partition") {
val table =
createTable(