aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala15
3 files changed, 18 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 6dc334ceb5..be119578d2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1278,7 +1278,7 @@ abstract class RDD[T: ClassTag](
def zipWithUniqueId(): RDD[(T, Long)] = withScope {
val n = this.partitions.length.toLong
this.mapPartitionsWithIndex { case (k, iter) =>
- iter.zipWithIndex.map { case (item, i) =>
+ Utils.getIteratorZipWithIndex(iter, 0L).map { case (item, i) =>
(item, i * n + k)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index b5738b9a95..b0e5ba0865 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -64,8 +64,7 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev)
override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = {
val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
- firstParent[T].iterator(split.prev, context).zipWithIndex.map { x =>
- (x._1, split.startIndex + x._2)
- }
+ val parentIter = firstParent[T].iterator(split.prev, context)
+ Utils.getIteratorZipWithIndex(parentIter, split.startIndex)
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7fba901b85..bfc609419c 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1760,6 +1760,21 @@ private[spark] object Utils extends Logging {
}
/**
+ * Generate a zipWithIndex iterator, avoid index value overflowing problem
+ * in scala's zipWithIndex
+ */
+ def getIteratorZipWithIndex[T](iterator: Iterator[T], startIndex: Long): Iterator[(T, Long)] = {
+ new Iterator[(T, Long)] {
+ var index: Long = startIndex - 1L
+ def hasNext: Boolean = iterator.hasNext
+ def next(): (T, Long) = {
+ index += 1L
+ (iterator.next(), index)
+ }
+ }
+ }
+
+ /**
* Creates a symlink.
*
* @param src absolute path to the source