diff options
author | codlife <1004910847@qq.com> | 2016-09-12 12:10:46 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-09-12 12:10:46 +0100 |
commit | 4efcdb7feae24e41d8120b59430f8b77cc2106a6 (patch) | |
tree | 83178b915686b487c9c05dd915cb096c0b1a26f6 /core/src | |
parent | cc87280fcd065b01667ca7a59a1a32c7ab757355 (diff) | |
download | spark-4efcdb7feae24e41d8120b59430f8b77cc2106a6.tar.gz spark-4efcdb7feae24e41d8120b59430f8b77cc2106a6.tar.bz2 spark-4efcdb7feae24e41d8120b59430f8b77cc2106a6.zip |
[SPARK-17447] Performance improvement in Partitioner.defaultPartitioner without sortBy
## What changes were proposed in this pull request?
if there are many rdds in some situations,the sort will loss he performance servely,actually we needn't sort the rdds , we can just scan the rdds one time to gain the same goal.
## How was this patch tested?
manual tests
Author: codlife <1004910847@qq.com>
Closes #15039 from codlife/master.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/Partitioner.scala | 16 |
1 files changed, 9 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 98c3abe93b..93dfbc0e6e 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -55,14 +55,16 @@ object Partitioner { * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. */ def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { - val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.length).reverse - for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) { - return r.partitioner.get - } - if (rdd.context.conf.contains("spark.default.parallelism")) { - new HashPartitioner(rdd.context.defaultParallelism) + val rdds = (Seq(rdd) ++ others) + val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0)) + if (hasPartitioner.nonEmpty) { + hasPartitioner.maxBy(_.partitions.length).partitioner.get } else { - new HashPartitioner(bySize.head.partitions.length) + if (rdd.context.conf.contains("spark.default.parallelism")) { + new HashPartitioner(rdd.context.defaultParallelism) + } else { + new HashPartitioner(rdds.map(_.partitions.length).max) + } } } } |