aboutsummaryrefslogtreecommitdiff
path: root/graphx/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-11-12 13:44:49 -0800
committerReynold Xin <rxin@databricks.com>2014-11-12 13:44:49 -0800
commit0402be90f7af82c8404cafbca79f5f9fb8e2bbed (patch)
treedc3a3f5384dc55f52e965ad79c56e49b8f8ddac1 /graphx/src
parentaa43a8da012cf0dac7c7fcccde5f028a942599f0 (diff)
downloadspark-0402be90f7af82c8404cafbca79f5f9fb8e2bbed.tar.gz
spark-0402be90f7af82c8404cafbca79f5f9fb8e2bbed.tar.bz2
spark-0402be90f7af82c8404cafbca79f5f9fb8e2bbed.zip
Internal cleanup for aggregateMessages
1. Add EdgeActiveness enum to represent activeness criteria more cleanly than using booleans. 2. Comments and whitespace. Author: Ankur Dave <ankurdave@gmail.com> Closes #3231 from ankurdave/aggregateMessages-followup and squashes the following commits: 3d485c3 [Ankur Dave] Internal cleanup for aggregateMessages
Diffstat (limited to 'graphx/src')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala3
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeActiveness.java34
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala52
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala14
4 files changed, 69 insertions, 34 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index e0ba9403ba..2c1b9518a3 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -207,8 +207,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* }}}
*
*/
- def mapTriplets[ED2: ClassTag](
- map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
+ def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
mapTriplets((pid, iter) => iter.map(map), TripletFields.All)
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeActiveness.java b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeActiveness.java
new file mode 100644
index 0000000000..377ae849f0
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeActiveness.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl;
+
+/**
+ * Criteria for filtering edges based on activeness. For internal use only.
+ */
+public enum EdgeActiveness {
+ /** Neither the source vertex nor the destination vertex need be active. */
+ Neither,
+ /** The source vertex must be active. */
+ SrcOnly,
+ /** The destination vertex must be active. */
+ DstOnly,
+ /** Both vertices must be active. */
+ Both,
+ /** At least one vertex must be active. */
+ Either
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index 78d8ac24b5..373af75448 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -64,6 +64,7 @@ class EdgePartition[
activeSet: Option[VertexSet])
extends Serializable {
+ /** No-arg constructor for serialization. */
private def this() = this(null, null, null, null, null, null, null, null)
/** Return a new `EdgePartition` with the specified edge data. */
@@ -375,12 +376,7 @@ class EdgePartition[
* @param sendMsg generates messages to neighboring vertices of an edge
* @param mergeMsg the combiner applied to messages destined to the same vertex
* @param tripletFields which triplet fields `sendMsg` uses
- * @param srcMustBeActive if true, edges will only be considered if their source vertex is in the
- * active set
- * @param dstMustBeActive if true, edges will only be considered if their destination vertex is in
- * the active set
- * @param maySatisfyEither if true, only one vertex need be in the active set for an edge to be
- * considered
+ * @param activeness criteria for filtering edges based on activeness
*
* @return iterator aggregated messages keyed by the receiving vertex id
*/
@@ -388,9 +384,7 @@ class EdgePartition[
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
tripletFields: TripletFields,
- srcMustBeActive: Boolean,
- dstMustBeActive: Boolean,
- maySatisfyEither: Boolean): Iterator[(VertexId, A)] = {
+ activeness: EdgeActiveness): Iterator[(VertexId, A)] = {
val aggregates = new Array[A](vertexAttrs.length)
val bitset = new BitSet(vertexAttrs.length)
@@ -401,10 +395,13 @@ class EdgePartition[
val srcId = local2global(localSrcId)
val localDstId = localDstIds(i)
val dstId = local2global(localDstId)
- val srcIsActive = !srcMustBeActive || isActive(srcId)
- val dstIsActive = !dstMustBeActive || isActive(dstId)
val edgeIsActive =
- if (maySatisfyEither) srcIsActive || dstIsActive else srcIsActive && dstIsActive
+ if (activeness == EdgeActiveness.Neither) true
+ else if (activeness == EdgeActiveness.SrcOnly) isActive(srcId)
+ else if (activeness == EdgeActiveness.DstOnly) isActive(dstId)
+ else if (activeness == EdgeActiveness.Both) isActive(srcId) && isActive(dstId)
+ else if (activeness == EdgeActiveness.Either) isActive(srcId) || isActive(dstId)
+ else throw new Exception("unreachable")
if (edgeIsActive) {
val srcAttr = if (tripletFields.useSrc) vertexAttrs(localSrcId) else null.asInstanceOf[VD]
val dstAttr = if (tripletFields.useDst) vertexAttrs(localDstId) else null.asInstanceOf[VD]
@@ -424,12 +421,7 @@ class EdgePartition[
* @param sendMsg generates messages to neighboring vertices of an edge
* @param mergeMsg the combiner applied to messages destined to the same vertex
* @param tripletFields which triplet fields `sendMsg` uses
- * @param srcMustBeActive if true, edges will only be considered if their source vertex is in the
- * active set
- * @param dstMustBeActive if true, edges will only be considered if their destination vertex is in
- * the active set
- * @param maySatisfyEither if true, only one vertex need be in the active set for an edge to be
- * considered
+ * @param activeness criteria for filtering edges based on activeness
*
* @return iterator aggregated messages keyed by the receiving vertex id
*/
@@ -437,9 +429,7 @@ class EdgePartition[
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
tripletFields: TripletFields,
- srcMustBeActive: Boolean,
- dstMustBeActive: Boolean,
- maySatisfyEither: Boolean): Iterator[(VertexId, A)] = {
+ activeness: EdgeActiveness): Iterator[(VertexId, A)] = {
val aggregates = new Array[A](vertexAttrs.length)
val bitset = new BitSet(vertexAttrs.length)
@@ -448,8 +438,16 @@ class EdgePartition[
val clusterSrcId = cluster._1
val clusterPos = cluster._2
val clusterLocalSrcId = localSrcIds(clusterPos)
- val srcIsActive = !srcMustBeActive || isActive(clusterSrcId)
- if (srcIsActive || maySatisfyEither) {
+
+ val scanCluster =
+ if (activeness == EdgeActiveness.Neither) true
+ else if (activeness == EdgeActiveness.SrcOnly) isActive(clusterSrcId)
+ else if (activeness == EdgeActiveness.DstOnly) true
+ else if (activeness == EdgeActiveness.Both) isActive(clusterSrcId)
+ else if (activeness == EdgeActiveness.Either) true
+ else throw new Exception("unreachable")
+
+ if (scanCluster) {
var pos = clusterPos
val srcAttr =
if (tripletFields.useSrc) vertexAttrs(clusterLocalSrcId) else null.asInstanceOf[VD]
@@ -457,9 +455,13 @@ class EdgePartition[
while (pos < size && localSrcIds(pos) == clusterLocalSrcId) {
val localDstId = localDstIds(pos)
val dstId = local2global(localDstId)
- val dstIsActive = !dstMustBeActive || isActive(dstId)
val edgeIsActive =
- if (maySatisfyEither) srcIsActive || dstIsActive else srcIsActive && dstIsActive
+ if (activeness == EdgeActiveness.Neither) true
+ else if (activeness == EdgeActiveness.SrcOnly) true
+ else if (activeness == EdgeActiveness.DstOnly) isActive(dstId)
+ else if (activeness == EdgeActiveness.Both) isActive(dstId)
+ else if (activeness == EdgeActiveness.Either) isActive(clusterSrcId) || isActive(dstId)
+ else throw new Exception("unreachable")
if (edgeIsActive) {
val dstAttr =
if (tripletFields.useDst) vertexAttrs(localDstId) else null.asInstanceOf[VD]
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index a1fe158b7b..2b4636a6c6 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -218,30 +218,30 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
case Some(EdgeDirection.Both) =>
if (activeFraction < 0.8) {
edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields,
- true, true, false)
+ EdgeActiveness.Both)
} else {
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
- true, true, false)
+ EdgeActiveness.Both)
}
case Some(EdgeDirection.Either) =>
// TODO: Because we only have a clustered index on the source vertex ID, we can't filter
// the index here. Instead we have to scan all edges and then do the filter.
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
- true, true, true)
+ EdgeActiveness.Either)
case Some(EdgeDirection.Out) =>
if (activeFraction < 0.8) {
edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields,
- true, false, false)
+ EdgeActiveness.SrcOnly)
} else {
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
- true, false, false)
+ EdgeActiveness.SrcOnly)
}
case Some(EdgeDirection.In) =>
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
- false, true, false)
+ EdgeActiveness.DstOnly)
case _ => // None
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
- false, false, false)
+ EdgeActiveness.Neither)
}
}).setName("GraphImpl.aggregateMessages - preAgg")