aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala7
1 files changed, 6 insertions, 1 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 5bcb96b136..5267560b3e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -82,12 +82,17 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
this
}
- /** Persists the vertex partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */
+ /** Persists the edge partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */
override def cache(): this.type = {
partitionsRDD.persist(targetStorageLevel)
this
}
+ /** The number of edges in the RDD. */
+ override def count(): Long = {
+ partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
+ }
+
private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>