From 0402be90f7af82c8404cafbca79f5f9fb8e2bbed Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Wed, 12 Nov 2014 13:44:49 -0800 Subject: Internal cleanup for aggregateMessages 1. Add EdgeActiveness enum to represent activeness criteria more cleanly than using booleans. 2. Comments and whitespace. Author: Ankur Dave Closes #3231 from ankurdave/aggregateMessages-followup and squashes the following commits: 3d485c3 [Ankur Dave] Internal cleanup for aggregateMessages --- .../main/scala/org/apache/spark/graphx/Graph.scala | 3 +- .../apache/spark/graphx/impl/EdgeActiveness.java | 34 ++++++++++++++ .../apache/spark/graphx/impl/EdgePartition.scala | 52 +++++++++++----------- .../org/apache/spark/graphx/impl/GraphImpl.scala | 14 +++--- 4 files changed, 69 insertions(+), 34 deletions(-) create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeActiveness.java (limited to 'graphx/src') diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index e0ba9403ba..2c1b9518a3 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -207,8 +207,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab * }}} * */ - def mapTriplets[ED2: ClassTag]( - map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { + def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { mapTriplets((pid, iter) => iter.map(map), TripletFields.All) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeActiveness.java b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeActiveness.java new file mode 100644 index 0000000000..377ae849f0 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeActiveness.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl; + +/** + * Criteria for filtering edges based on activeness. For internal use only. + */ +public enum EdgeActiveness { + /** Neither the source vertex nor the destination vertex need be active. */ + Neither, + /** The source vertex must be active. */ + SrcOnly, + /** The destination vertex must be active. */ + DstOnly, + /** Both vertices must be active. */ + Both, + /** At least one vertex must be active. */ + Either +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index 78d8ac24b5..373af75448 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -64,6 +64,7 @@ class EdgePartition[ activeSet: Option[VertexSet]) extends Serializable { + /** No-arg constructor for serialization. */ private def this() = this(null, null, null, null, null, null, null, null) /** Return a new `EdgePartition` with the specified edge data. */ @@ -375,12 +376,7 @@ class EdgePartition[ * @param sendMsg generates messages to neighboring vertices of an edge * @param mergeMsg the combiner applied to messages destined to the same vertex * @param tripletFields which triplet fields `sendMsg` uses - * @param srcMustBeActive if true, edges will only be considered if their source vertex is in the - * active set - * @param dstMustBeActive if true, edges will only be considered if their destination vertex is in - * the active set - * @param maySatisfyEither if true, only one vertex need be in the active set for an edge to be - * considered + * @param activeness criteria for filtering edges based on activeness * * @return iterator aggregated messages keyed by the receiving vertex id */ @@ -388,9 +384,7 @@ class EdgePartition[ sendMsg: EdgeContext[VD, ED, A] => Unit, mergeMsg: (A, A) => A, tripletFields: TripletFields, - srcMustBeActive: Boolean, - dstMustBeActive: Boolean, - maySatisfyEither: Boolean): Iterator[(VertexId, A)] = { + activeness: EdgeActiveness): Iterator[(VertexId, A)] = { val aggregates = new Array[A](vertexAttrs.length) val bitset = new BitSet(vertexAttrs.length) @@ -401,10 +395,13 @@ class EdgePartition[ val srcId = local2global(localSrcId) val localDstId = localDstIds(i) val dstId = local2global(localDstId) - val srcIsActive = !srcMustBeActive || isActive(srcId) - val dstIsActive = !dstMustBeActive || isActive(dstId) val edgeIsActive = - if (maySatisfyEither) srcIsActive || dstIsActive else srcIsActive && dstIsActive + if (activeness == EdgeActiveness.Neither) true + else if (activeness == EdgeActiveness.SrcOnly) isActive(srcId) + else if (activeness == EdgeActiveness.DstOnly) isActive(dstId) + else if (activeness == EdgeActiveness.Both) isActive(srcId) && isActive(dstId) + else if (activeness == EdgeActiveness.Either) isActive(srcId) || isActive(dstId) + else throw new Exception("unreachable") if (edgeIsActive) { val srcAttr = if (tripletFields.useSrc) vertexAttrs(localSrcId) else null.asInstanceOf[VD] val dstAttr = if (tripletFields.useDst) vertexAttrs(localDstId) else null.asInstanceOf[VD] @@ -424,12 +421,7 @@ class EdgePartition[ * @param sendMsg generates messages to neighboring vertices of an edge * @param mergeMsg the combiner applied to messages destined to the same vertex * @param tripletFields which triplet fields `sendMsg` uses - * @param srcMustBeActive if true, edges will only be considered if their source vertex is in the - * active set - * @param dstMustBeActive if true, edges will only be considered if their destination vertex is in - * the active set - * @param maySatisfyEither if true, only one vertex need be in the active set for an edge to be - * considered + * @param activeness criteria for filtering edges based on activeness * * @return iterator aggregated messages keyed by the receiving vertex id */ @@ -437,9 +429,7 @@ class EdgePartition[ sendMsg: EdgeContext[VD, ED, A] => Unit, mergeMsg: (A, A) => A, tripletFields: TripletFields, - srcMustBeActive: Boolean, - dstMustBeActive: Boolean, - maySatisfyEither: Boolean): Iterator[(VertexId, A)] = { + activeness: EdgeActiveness): Iterator[(VertexId, A)] = { val aggregates = new Array[A](vertexAttrs.length) val bitset = new BitSet(vertexAttrs.length) @@ -448,8 +438,16 @@ class EdgePartition[ val clusterSrcId = cluster._1 val clusterPos = cluster._2 val clusterLocalSrcId = localSrcIds(clusterPos) - val srcIsActive = !srcMustBeActive || isActive(clusterSrcId) - if (srcIsActive || maySatisfyEither) { + + val scanCluster = + if (activeness == EdgeActiveness.Neither) true + else if (activeness == EdgeActiveness.SrcOnly) isActive(clusterSrcId) + else if (activeness == EdgeActiveness.DstOnly) true + else if (activeness == EdgeActiveness.Both) isActive(clusterSrcId) + else if (activeness == EdgeActiveness.Either) true + else throw new Exception("unreachable") + + if (scanCluster) { var pos = clusterPos val srcAttr = if (tripletFields.useSrc) vertexAttrs(clusterLocalSrcId) else null.asInstanceOf[VD] @@ -457,9 +455,13 @@ class EdgePartition[ while (pos < size && localSrcIds(pos) == clusterLocalSrcId) { val localDstId = localDstIds(pos) val dstId = local2global(localDstId) - val dstIsActive = !dstMustBeActive || isActive(dstId) val edgeIsActive = - if (maySatisfyEither) srcIsActive || dstIsActive else srcIsActive && dstIsActive + if (activeness == EdgeActiveness.Neither) true + else if (activeness == EdgeActiveness.SrcOnly) true + else if (activeness == EdgeActiveness.DstOnly) isActive(dstId) + else if (activeness == EdgeActiveness.Both) isActive(dstId) + else if (activeness == EdgeActiveness.Either) isActive(clusterSrcId) || isActive(dstId) + else throw new Exception("unreachable") if (edgeIsActive) { val dstAttr = if (tripletFields.useDst) vertexAttrs(localDstId) else null.asInstanceOf[VD] diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index a1fe158b7b..2b4636a6c6 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -218,30 +218,30 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( case Some(EdgeDirection.Both) => if (activeFraction < 0.8) { edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields, - true, true, false) + EdgeActiveness.Both) } else { edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields, - true, true, false) + EdgeActiveness.Both) } case Some(EdgeDirection.Either) => // TODO: Because we only have a clustered index on the source vertex ID, we can't filter // the index here. Instead we have to scan all edges and then do the filter. edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields, - true, true, true) + EdgeActiveness.Either) case Some(EdgeDirection.Out) => if (activeFraction < 0.8) { edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields, - true, false, false) + EdgeActiveness.SrcOnly) } else { edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields, - true, false, false) + EdgeActiveness.SrcOnly) } case Some(EdgeDirection.In) => edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields, - false, true, false) + EdgeActiveness.DstOnly) case _ => // None edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields, - false, false, false) + EdgeActiveness.Neither) } }).setName("GraphImpl.aggregateMessages - preAgg") -- cgit v1.2.3