aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2013-12-31 21:37:51 -0800
committerJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2013-12-31 21:37:51 -0800
commit2f2524fd11876a19e315f994020877564a8a0df8 (patch)
treeb444f923e0b5e21470e4a3a8bf7c10f927b79aae /graph/src
parent3d93d7339626bec62d514cd524e45084a22715ae (diff)
downloadspark-2f2524fd11876a19e315f994020877564a8a0df8.tar.gz
spark-2f2524fd11876a19e315f994020877564a8a0df8.tar.bz2
spark-2f2524fd11876a19e315f994020877564a8a0df8.zip
Addressing issue in compute where compute is invoked instead of iterator on the parent RDD.
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala8
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala4
2 files changed, 6 insertions, 6 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
index b1640bf9ce..9aa76c9394 100644
--- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
@@ -22,9 +22,8 @@ class EdgeRDD[@specialized ED: ClassManifest](
override val partitioner =
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
- override def compute(split: Partition, context: TaskContext): Iterator[Edge[ED]] = {
- val edgePartition = partitionsRDD.compute(split, context).next()._2
- edgePartition.iterator
+ override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
+ firstParent[(Pid, EdgePartition[ED])].iterator(part, context).next._2.iterator
}
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
@@ -33,7 +32,7 @@ class EdgeRDD[@specialized ED: ClassManifest](
* Caching a VertexRDD causes the index and values to be cached separately.
*/
override def persist(newLevel: StorageLevel): EdgeRDD[ED] = {
- super.persist(newLevel)
+ partitionsRDD.persist(newLevel)
this
}
@@ -45,6 +44,7 @@ class EdgeRDD[@specialized ED: ClassManifest](
def mapEdgePartitions[ED2: ClassManifest](f: EdgePartition[ED] => EdgePartition[ED2])
: EdgeRDD[ED2] = {
+// iter => iter.map { case (pid, ep) => (pid, f(ep)) }
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(ep)))
diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala
index 671cf496f8..c274e342c7 100644
--- a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala
@@ -86,7 +86,7 @@ class VertexRDD[@specialized VD: ClassManifest](
* Caching a VertexRDD causes the index and values to be cached separately.
*/
override def persist(newLevel: StorageLevel): VertexRDD[VD] = {
- super.persist(newLevel)
+ partitionsRDD.persist(newLevel)
this
}
@@ -105,7 +105,7 @@ class VertexRDD[@specialized VD: ClassManifest](
* Provide the `RDD[(Vid, VD)]` equivalent output.
*/
override def compute(part: Partition, context: TaskContext): Iterator[(Vid, VD)] = {
- partitionsRDD.compute(part, context).next().iterator
+ firstParent[VertexPartition[VD]].iterator(part, context).next.iterator
}
/**