diff options
author | Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com> | 2013-12-31 21:37:51 -0800 |
---|---|---|
committer | Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com> | 2013-12-31 21:37:51 -0800 |
commit | 2f2524fd11876a19e315f994020877564a8a0df8 (patch) | |
tree | b444f923e0b5e21470e4a3a8bf7c10f927b79aae /graph/src | |
parent | 3d93d7339626bec62d514cd524e45084a22715ae (diff) | |
download | spark-2f2524fd11876a19e315f994020877564a8a0df8.tar.gz spark-2f2524fd11876a19e315f994020877564a8a0df8.tar.bz2 spark-2f2524fd11876a19e315f994020877564a8a0df8.zip |
Addressing issue in compute where compute is invoked instead of iterator on the parent RDD.
Diffstat (limited to 'graph/src')
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala | 8 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala | 4 |
2 files changed, 6 insertions, 6 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala index b1640bf9ce..9aa76c9394 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala @@ -22,9 +22,8 @@ class EdgeRDD[@specialized ED: ClassManifest]( override val partitioner = partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) - override def compute(split: Partition, context: TaskContext): Iterator[Edge[ED]] = { - val edgePartition = partitionsRDD.compute(split, context).next()._2 - edgePartition.iterator + override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { + firstParent[(Pid, EdgePartition[ED])].iterator(part, context).next._2.iterator } override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() @@ -33,7 +32,7 @@ class EdgeRDD[@specialized ED: ClassManifest]( * Caching a VertexRDD causes the index and values to be cached separately. */ override def persist(newLevel: StorageLevel): EdgeRDD[ED] = { - super.persist(newLevel) + partitionsRDD.persist(newLevel) this } @@ -45,6 +44,7 @@ class EdgeRDD[@specialized ED: ClassManifest]( def mapEdgePartitions[ED2: ClassManifest](f: EdgePartition[ED] => EdgePartition[ED2]) : EdgeRDD[ED2] = { +// iter => iter.map { case (pid, ep) => (pid, f(ep)) } new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter => val (pid, ep) = iter.next() Iterator(Tuple2(pid, f(ep))) diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala index 671cf496f8..c274e342c7 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala @@ -86,7 +86,7 @@ class VertexRDD[@specialized VD: ClassManifest]( * Caching a VertexRDD causes the index and values to be cached separately. */ override def persist(newLevel: StorageLevel): VertexRDD[VD] = { - super.persist(newLevel) + partitionsRDD.persist(newLevel) this } @@ -105,7 +105,7 @@ class VertexRDD[@specialized VD: ClassManifest]( * Provide the `RDD[(Vid, VD)]` equivalent output. */ override def compute(part: Partition, context: TaskContext): Iterator[(Vid, VD)] = { - partitionsRDD.compute(part, context).next().iterator + firstParent[VertexPartition[VD]].iterator(part, context).next.iterator } /** |