diff options
author | WeichenXu <WeichenXu123@outlook.com> | 2016-10-19 23:41:38 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-10-19 23:41:38 -0700 |
commit | 39755169fb5bb07332eef263b4c18ede1528812d (patch) | |
tree | 18acc7ee6c86199aeb609d2e22351b94fb691659 /core/src/main | |
parent | f313117bc93b0bf560528b316d3e6947caa96296 (diff) | |
download | spark-39755169fb5bb07332eef263b4c18ede1528812d.tar.gz spark-39755169fb5bb07332eef263b4c18ede1528812d.tar.bz2 spark-39755169fb5bb07332eef263b4c18ede1528812d.zip |
[SPARK-18003][SPARK CORE] Fix bug of RDD zipWithIndex & zipWithUniqueId index value overflowing
## What changes were proposed in this pull request?
- Fix bug of RDD `zipWithIndex` generating wrong result when one partition contains more than 2147483647 records.
- Fix bug of RDD `zipWithUniqueId` generating wrong result when one partition contains more than 2147483647 records.
## How was this patch tested?
test added.
Author: WeichenXu <WeichenXu123@outlook.com>
Closes #15550 from WeichenXu123/fix_rdd_zipWithIndex_overflow.
Diffstat (limited to 'core/src/main')
3 files changed, 18 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 6dc334ceb5..be119578d2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1278,7 +1278,7 @@ abstract class RDD[T: ClassTag]( def zipWithUniqueId(): RDD[(T, Long)] = withScope { val n = this.partitions.length.toLong this.mapPartitionsWithIndex { case (k, iter) => - iter.zipWithIndex.map { case (item, i) => + Utils.getIteratorZipWithIndex(iter, 0L).map { case (item, i) => (item, i * n + k) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala index b5738b9a95..b0e5ba0865 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala @@ -64,8 +64,7 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = { val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition] - firstParent[T].iterator(split.prev, context).zipWithIndex.map { x => - (x._1, split.startIndex + x._2) - } + val parentIter = firstParent[T].iterator(split.prev, context) + Utils.getIteratorZipWithIndex(parentIter, split.startIndex) } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 7fba901b85..bfc609419c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1760,6 +1760,21 @@ private[spark] object Utils extends Logging { } /** + * Generate a zipWithIndex iterator, avoid index value overflowing problem + * in scala's zipWithIndex + */ + def getIteratorZipWithIndex[T](iterator: Iterator[T], startIndex: Long): Iterator[(T, Long)] = { + new Iterator[(T, Long)] { + var index: Long = startIndex - 1L + def hasNext: Boolean = iterator.hasNext + def next(): (T, Long) = { + index += 1L + (iterator.next(), index) + } + } + } + + /** * Creates a symlink. * * @param src absolute path to the source |