aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorluluorta <luluorta@gmail.com>2014-11-01 01:22:46 -0700
committerReynold Xin <rxin@databricks.com>2014-11-01 01:23:00 -0700
commit1b282cdfda13e057b9cd85e1d71847d366fe7fcb (patch)
tree0991e4b7c6e601736a2aac7599f014ce8f682b6b
parentabdb90bd7ef1f9d84d63b001b7a305c577b8e0f2 (diff)
downloadspark-1b282cdfda13e057b9cd85e1d71847d366fe7fcb.tar.gz
spark-1b282cdfda13e057b9cd85e1d71847d366fe7fcb.tar.bz2
spark-1b282cdfda13e057b9cd85e1d71847d366fe7fcb.zip
[SPARK-4115][GraphX] Add overrided count for edge counting of EdgeRDD.
Accumulate sizes of all the EdgePartitions just like the VertexRDD. Author: luluorta <luluorta@gmail.com> Closes #2975 from luluorta/graph-edge-count and squashes the following commits: 86ef0e5 [luluorta] Add overrided count for edge counting of EdgeRDD. (cherry picked from commit ee29ef3800438501e0ff207feb00a28973fc0769) Signed-off-by: Reynold Xin <rxin@databricks.com>
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala7
1 files changed, 6 insertions, 1 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 899a3cbd62..65c2b09c4d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -74,12 +74,17 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
this
}
- /** Persists the vertex partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */
+ /** Persists the edge partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */
override def cache(): this.type = {
partitionsRDD.persist(targetStorageLevel)
this
}
+ /** The number of edges in the RDD. */
+ override def count(): Long = {
+ partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
+ }
+
private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>