aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala17
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala4
3 files changed, 30 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala
index 2f1f907c6c..5f95559f15 100644
--- a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala
@@ -24,7 +24,6 @@ import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet}
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
-
import scala.collection.mutable.BitSet
@@ -72,7 +71,7 @@ class RDDIndex[@specialized K: ClassManifest](private[spark] val rdd: RDD[BlockI
*/
class IndexedRDD[K: ClassManifest, V: ClassManifest](
@transient val index: RDDIndex[K],
- @transient val valuesRDD: RDD[ (Seq[V], BitSet) ])
+ @transient val valuesRDD: RDD[ (IndexedSeq[V], BitSet) ])
extends RDD[(K, V)](index.rdd.context,
List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) {
@@ -119,13 +118,14 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
*/
def mapValues[U: ClassManifest](f: V => U): IndexedRDD[K, U] = {
val cleanF = index.rdd.context.clean(f)
- val newValuesRDD = valuesRDD.mapPartitions(iter => iter.map{
+ val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] =
+ valuesRDD.mapPartitions(iter => iter.map{
case (values, bs) =>
val newValues = new Array[U](values.size)
for ( ind <- bs ) {
newValues(ind) = f(values(ind))
}
- (newValues.toSeq, bs)
+ (newValues.toIndexedSeq, bs)
}, preservesPartitioning = true)
new IndexedRDD[K,U](index, newValuesRDD)
}
@@ -137,7 +137,8 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
*/
def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): IndexedRDD[K, U] = {
val cleanF = index.rdd.context.clean(f)
- val newValues = index.rdd.zipPartitions(valuesRDD){
+ val newValues: RDD[ (IndexedSeq[U], BitSet) ] =
+ index.rdd.zipPartitions(valuesRDD){
(keysIter, valuesIter) =>
val index = keysIter.next()
assert(keysIter.hasNext() == false)
@@ -149,7 +150,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
for( (k,i) <- index ) {
if (bs(i)) { newValues(i) = f(k, oldValues(i)) }
}
- Array((newValues.toSeq, bs)).iterator
+ Array((newValues.toIndexedSeq, bs)).iterator
}
new IndexedRDD[K,U](index, newValues)
}
@@ -159,7 +160,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
if(index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
}
- val newValuesRDD: RDD[ (Seq[(V,W)], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){
+ val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){
(thisIter, otherIter) =>
val (thisValues, thisBS) = thisIter.next()
assert(!thisIter.hasNext)
@@ -167,7 +168,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
assert(!otherIter.hasNext)
val newBS = thisBS & otherBS
val newValues = thisValues.view.zip(otherValues)
- Iterator((newValues, newBS))
+ Iterator((newValues.toIndexedSeq, newBS))
}
new IndexedRDD(index, newValuesRDD)
}
@@ -177,7 +178,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
if(index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
}
- val newValuesRDD: RDD[ (Seq[(V,Option[W])], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){
+ val newValuesRDD: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){
(thisIter, otherIter) =>
val (thisValues, thisBS) = thisIter.next()
assert(!thisIter.hasNext)
@@ -186,7 +187,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
val otherOption = otherValues.view.zipWithIndex
.map{ (x: (W, Int)) => if(otherBS(x._2)) Option(x._1) else None }
val newValues = thisValues.view.zip(otherOption)
- Iterator((newValues, thisBS))
+ Iterator((newValues.toIndexedSeq, thisBS))
}
new IndexedRDD(index, newValuesRDD)
}
@@ -197,6 +198,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
other: RDD[(K,W)], merge: (W,W) => W = (a:W, b:W) => a):
IndexedRDD[K, (V, Option[W]) ] = {
val cleanMerge = index.rdd.context.clean(merge)
+
other match {
case other: IndexedRDD[_, _] if index == other.index => {
leftZipJoin(other)
@@ -211,7 +213,8 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
val otherShuffled =
if (other.partitioner == Some(partitioner)) other
else other.partitionBy(partitioner)
- val newValues = index.rdd.zipPartitions(valuesRDD, other) {
+ val newValues: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] =
+ index.rdd.zipPartitions(valuesRDD, other) {
(thisIndexIter, thisIter, tuplesIter) =>
val index = thisIndexIter.next()
assert(!thisIndexIter.hasNext)
@@ -236,7 +239,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
.map{ (x: (W, Int)) => if(wBS(x._2)) Option(x._1) else None }
val newValues = thisValues.view.zip(otherOption)
- Iterator((newValues.toSeq, thisBS))
+ Iterator((newValues.toIndexedSeq, thisBS))
} // end of newValues
new IndexedRDD(index, newValues)
}
@@ -496,11 +499,12 @@ object IndexedRDD {
values(ind) = reduceFunc(values(ind), v)
}
}
- Iterator( (indexMap, (values.toSeq, bs)) )
+ Iterator( (indexMap, (values.toIndexedSeq, bs)) )
}, true).cache
// extract the index and the values
val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true)
- val values = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
+ val values: RDD[(IndexedSeq[V], BitSet)] =
+ groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
new IndexedRDD[K,V](new RDDIndex(index), values)
}
@@ -580,7 +584,7 @@ object IndexedRDD {
}
// Use the index to build the new values table
- val values: RDD[ (Seq[C], BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
+ val values: RDD[ (IndexedSeq[C], BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
// There is only one map
val index = indexIter.next()
assert(!indexIter.hasNext())
diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala
index 0310711d37..fd7c16089d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala
@@ -60,7 +60,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
*/
override def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K,U)] = {
val cleanF = self.index.rdd.context.clean(f)
- val newValuesRDD = self.valuesRDD.mapPartitions(iter => iter.map{
+ val newValuesRDD: RDD[(IndexedSeq[U], BitSet)] = self.valuesRDD.mapPartitions(iter => iter.map{
case (values, bs) =>
val newValues = new Array[U](values.size)
val newBS = new BitSet(values.size)
@@ -71,7 +71,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
newBS(ind) = true
}
}
- (newValues.toSeq, newBS)
+ (newValues.toIndexedSeq, newBS)
}, preservesPartitioning = true)
new IndexedRDD[K,U](self.index, newValuesRDD)
}
@@ -120,7 +120,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
// then we simply merge the value RDDs.
// However it is possible that both RDDs are missing a value for a given key in
// which case the returned RDD should have a null value
- val newValues =
+ val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
self.valuesRDD.zipPartitions(other.valuesRDD){
(thisIter, otherIter) =>
val (thisValues, thisBS) = thisIter.next()
@@ -136,7 +136,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
val b = if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W]
newValues(ind) = (a, b)
}
- Iterator((newValues.toSeq, newBS))
+ Iterator((newValues.toIndexedSeq, newBS))
}
new IndexedRDD(self.index, newValues)
}
@@ -166,7 +166,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
List(newIndex).iterator
}).cache()
// Use the new index along with the this and the other indices to merge the values
- val newValues =
+ val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
newIndex.zipPartitions(self.tuples, other.tuples)(
(newIndexIter, thisTuplesIter, otherTuplesIter) => {
// Get the new index for this partition
@@ -199,7 +199,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
newBS(ind) = true
}
}
- Iterator((newValues.toSeq, newBS))
+ Iterator((newValues.toIndexedSeq, newBS))
})
new IndexedRDD(new RDDIndex(newIndex), newValues)
}
@@ -262,12 +262,13 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
// case null => null
// case (s, ab) => Seq((s, ab.toSeq))
// }.toSeq
- Iterator( (newIndex, (newValues.toSeq, newBS)) )
+ Iterator( (newIndex, (newValues.toIndexedSeq, newBS)) )
}).cache()
// Extract the index and values from the above RDD
val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true)
- val newValues = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
+ val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
+ groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
new IndexedRDD[K, (Seq[V], Seq[W])](new RDDIndex(newIndex), newValues)
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index ce1b9467c4..413177b2da 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -545,7 +545,7 @@ object GraphImpl {
val newValuesRDD = replicationMap.valuesRDD.zipPartitions(msgsByPartition){
(mapIter, msgsIter) =>
- val (Seq(vidToIndex), bs) = mapIter.next()
+ val (IndexedSeq(vidToIndex), bs) = mapIter.next()
assert(!mapIter.hasNext)
// Populate the vertex array using the vidToIndex map
val vertexArray = new Array[VD](vidToIndex.size)
@@ -553,7 +553,7 @@ object GraphImpl {
val ind = vidToIndex(msg.data._1)
vertexArray(ind) = msg.data._2
}
- Iterator((Seq(vertexArray), bs))
+ Iterator((IndexedSeq(vertexArray), bs))
}
new IndexedRDD(replicationMap.index, newValuesRDD)