aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDarek Blasiak <darek.blasiak@640labs.com>2016-01-07 21:15:40 +0000
committerSean Owen <sowen@cloudera.com>2016-01-07 21:15:40 +0000
commit8346518357f4a3565ae41e9a5ccd7e2c3ed6c468 (patch)
tree779a668d55d24f64a7142a8d17d71ae0d16eb62f
parent1b2c2162af4d5d2d950af94571e69273b49bf913 (diff)
downloadspark-8346518357f4a3565ae41e9a5ccd7e2c3ed6c468.tar.gz
spark-8346518357f4a3565ae41e9a5ccd7e2c3ed6c468.tar.bz2
spark-8346518357f4a3565ae41e9a5ccd7e2c3ed6c468.zip
[SPARK-12598][CORE] bug in setMinPartitions
There is a bug in the calculation of ```maxSplitSize```. The ```totalLen``` should be divided by ```minPartitions``` and not by ```files.size```. Author: Darek Blasiak <darek.blasiak@640labs.com> Closes #10546 from datafarmer/setminpartitionsbug.
-rw-r--r--core/src/main/scala/org/apache/spark/input/PortableDataStream.scala5
1 files changed, 2 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index 8009491a1b..18cb7631b3 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -41,9 +41,8 @@ private[spark] abstract class StreamFileInputFormat[T]
* which is set through setMaxSplitSize
*/
def setMinPartitions(context: JobContext, minPartitions: Int) {
- val files = listStatus(context).asScala
- val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
- val maxSplitSize = Math.ceil(totalLen * 1.0 / files.size).toLong
+ val totalLen = listStatus(context).asScala.filterNot(_.isDirectory).map(_.getLen).sum
+ val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
super.setMaxSplitSize(maxSplitSize)
}