aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeContext.scala51
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala137
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala85
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java51
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala378
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala95
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala84
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala85
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala8
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala46
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala19
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala19
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala41
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala37
15 files changed, 766 insertions, 376 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeContext.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeContext.scala
new file mode 100644
index 0000000000..f70715fca6
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeContext.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx
+
+/**
+ * Represents an edge along with its neighboring vertices and allows sending messages along the
+ * edge. Used in [[Graph#aggregateMessages]].
+ */
+abstract class EdgeContext[VD, ED, A] {
+ /** The vertex id of the edge's source vertex. */
+ def srcId: VertexId
+ /** The vertex id of the edge's destination vertex. */
+ def dstId: VertexId
+ /** The vertex attribute of the edge's source vertex. */
+ def srcAttr: VD
+ /** The vertex attribute of the edge's destination vertex. */
+ def dstAttr: VD
+ /** The attribute associated with the edge. */
+ def attr: ED
+
+ /** Sends a message to the source vertex. */
+ def sendToSrc(msg: A): Unit
+ /** Sends a message to the destination vertex. */
+ def sendToDst(msg: A): Unit
+
+ /** Converts the edge and vertex properties into an [[EdgeTriplet]] for convenience. */
+ def toEdgeTriplet: EdgeTriplet[VD, ED] = {
+ val et = new EdgeTriplet[VD, ED]
+ et.srcId = srcId
+ et.srcAttr = srcAttr
+ et.dstId = dstId
+ et.dstAttr = dstAttr
+ et.attr = attr
+ et
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index fa4b891754..e0ba9403ba 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -207,8 +207,39 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* }}}
*
*/
- def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
- mapTriplets((pid, iter) => iter.map(map))
+ def mapTriplets[ED2: ClassTag](
+ map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
+ mapTriplets((pid, iter) => iter.map(map), TripletFields.All)
+ }
+
+ /**
+ * Transforms each edge attribute using the map function, passing it the adjacent vertex
+ * attributes as well. If adjacent vertex values are not required,
+ * consider using `mapEdges` instead.
+ *
+ * @note This does not change the structure of the
+ * graph or modify the values of this graph. As a consequence
+ * the underlying index structures can be reused.
+ *
+ * @param map the function from an edge object to a new edge value.
+ * @param tripletFields which fields should be included in the edge triplet passed to the map
+ * function. If not all fields are needed, specifying this can improve performance.
+ *
+ * @tparam ED2 the new edge data type
+ *
+ * @example This function might be used to initialize edge
+ * attributes based on the attributes associated with each vertex.
+ * {{{
+ * val rawGraph: Graph[Int, Int] = someLoadFunction()
+ * val graph = rawGraph.mapTriplets[Int]( edge =>
+ * edge.src.data - edge.dst.data)
+ * }}}
+ *
+ */
+ def mapTriplets[ED2: ClassTag](
+ map: EdgeTriplet[VD, ED] => ED2,
+ tripletFields: TripletFields): Graph[VD, ED2] = {
+ mapTriplets((pid, iter) => iter.map(map), tripletFields)
}
/**
@@ -223,12 +254,15 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* the underlying index structures can be reused.
*
* @param map the iterator transform
+ * @param tripletFields which fields should be included in the edge triplet passed to the map
+ * function. If not all fields are needed, specifying this can improve performance.
*
* @tparam ED2 the new edge data type
*
*/
- def mapTriplets[ED2: ClassTag](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
- : Graph[VD, ED2]
+ def mapTriplets[ED2: ClassTag](
+ map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2],
+ tripletFields: TripletFields): Graph[VD, ED2]
/**
* Reverses all edges in the graph. If this graph contains an edge from a to b then the returned
@@ -287,6 +321,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* "sent" to either vertex in the edge. The `reduceFunc` is then used to combine the output of
* the map phase destined to each vertex.
*
+ * This function is deprecated in 1.2.0 because of SPARK-3936. Use aggregateMessages instead.
+ *
* @tparam A the type of "message" to be sent to each vertex
*
* @param mapFunc the user defined map function which returns 0 or
@@ -296,13 +332,15 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* be commutative and associative and is used to combine the output
* of the map phase
*
- * @param activeSetOpt optionally, a set of "active" vertices and a direction of edges to
- * consider when running `mapFunc`. If the direction is `In`, `mapFunc` will only be run on
- * edges with destination in the active set. If the direction is `Out`,
- * `mapFunc` will only be run on edges originating from vertices in the active set. If the
- * direction is `Either`, `mapFunc` will be run on edges with *either* vertex in the active set
- * . If the direction is `Both`, `mapFunc` will be run on edges with *both* vertices in the
- * active set. The active set must have the same index as the graph's vertices.
+ * @param activeSetOpt an efficient way to run the aggregation on a subset of the edges if
+ * desired. This is done by specifying a set of "active" vertices and an edge direction. The
+ * `sendMsg` function will then run only on edges connected to active vertices by edges in the
+ * specified direction. If the direction is `In`, `sendMsg` will only be run on edges with
+ * destination in the active set. If the direction is `Out`, `sendMsg` will only be run on edges
+ * originating from vertices in the active set. If the direction is `Either`, `sendMsg` will be
+ * run on edges with *either* vertex in the active set. If the direction is `Both`, `sendMsg`
+ * will be run on edges with *both* vertices in the active set. The active set must have the
+ * same index as the graph's vertices.
*
* @example We can use this function to compute the in-degree of each
* vertex
@@ -319,6 +357,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* predicate or implement PageRank.
*
*/
+ @deprecated("use aggregateMessages", "1.2.0")
def mapReduceTriplets[A: ClassTag](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
@@ -326,8 +365,80 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
: VertexRDD[A]
/**
- * Joins the vertices with entries in the `table` RDD and merges the results using `mapFunc`. The
- * input table should contain at most one entry for each vertex. If no entry in `other` is
+ * Aggregates values from the neighboring edges and vertices of each vertex. The user-supplied
+ * `sendMsg` function is invoked on each edge of the graph, generating 0 or more messages to be
+ * sent to either vertex in the edge. The `mergeMsg` function is then used to combine all messages
+ * destined to the same vertex.
+ *
+ * @tparam A the type of message to be sent to each vertex
+ *
+ * @param sendMsg runs on each edge, sending messages to neighboring vertices using the
+ * [[EdgeContext]].
+ * @param mergeMsg used to combine messages from `sendMsg` destined to the same vertex. This
+ * combiner should be commutative and associative.
+ * @param tripletFields which fields should be included in the [[EdgeContext]] passed to the
+ * `sendMsg` function. If not all fields are needed, specifying this can improve performance.
+ *
+ * @example We can use this function to compute the in-degree of each
+ * vertex
+ * {{{
+ * val rawGraph: Graph[_, _] = Graph.textFile("twittergraph")
+ * val inDeg: RDD[(VertexId, Int)] =
+ * aggregateMessages[Int](ctx => ctx.sendToDst(1), _ + _)
+ * }}}
+ *
+ * @note By expressing computation at the edge level we achieve
+ * maximum parallelism. This is one of the core functions in the
+ * Graph API in that enables neighborhood level computation. For
+ * example this function can be used to count neighbors satisfying a
+ * predicate or implement PageRank.
+ *
+ */
+ def aggregateMessages[A: ClassTag](
+ sendMsg: EdgeContext[VD, ED, A] => Unit,
+ mergeMsg: (A, A) => A,
+ tripletFields: TripletFields = TripletFields.All)
+ : VertexRDD[A] = {
+ aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None)
+ }
+
+ /**
+ * Aggregates values from the neighboring edges and vertices of each vertex. The user-supplied
+ * `sendMsg` function is invoked on each edge of the graph, generating 0 or more messages to be
+ * sent to either vertex in the edge. The `mergeMsg` function is then used to combine all messages
+ * destined to the same vertex.
+ *
+ * This variant can take an active set to restrict the computation and is intended for internal
+ * use only.
+ *
+ * @tparam A the type of message to be sent to each vertex
+ *
+ * @param sendMsg runs on each edge, sending messages to neighboring vertices using the
+ * [[EdgeContext]].
+ * @param mergeMsg used to combine messages from `sendMsg` destined to the same vertex. This
+ * combiner should be commutative and associative.
+ * @param tripletFields which fields should be included in the [[EdgeContext]] passed to the
+ * `sendMsg` function. If not all fields are needed, specifying this can improve performance.
+ * @param activeSetOpt an efficient way to run the aggregation on a subset of the edges if
+ * desired. This is done by specifying a set of "active" vertices and an edge direction. The
+ * `sendMsg` function will then run on only edges connected to active vertices by edges in the
+ * specified direction. If the direction is `In`, `sendMsg` will only be run on edges with
+ * destination in the active set. If the direction is `Out`, `sendMsg` will only be run on edges
+ * originating from vertices in the active set. If the direction is `Either`, `sendMsg` will be
+ * run on edges with *either* vertex in the active set. If the direction is `Both`, `sendMsg`
+ * will be run on edges with *both* vertices in the active set. The active set must have the
+ * same index as the graph's vertices.
+ */
+ private[graphx] def aggregateMessagesWithActiveSet[A: ClassTag](
+ sendMsg: EdgeContext[VD, ED, A] => Unit,
+ mergeMsg: (A, A) => A,
+ tripletFields: TripletFields,
+ activeSetOpt: Option[(VertexRDD[_], EdgeDirection)])
+ : VertexRDD[A]
+
+ /**
+ * Joins the vertices with entries in the `table` RDD and merges the results using `mapFunc`.
+ * The input table should contain at most one entry for each vertex. If no entry in `other` is
* provided for a particular vertex in the graph, the map function receives `None`.
*
* @tparam U the type of entry in the table of updates
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index d0dd45dba6..d5150382d5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -69,11 +69,12 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
*/
private def degreesRDD(edgeDirection: EdgeDirection): VertexRDD[Int] = {
if (edgeDirection == EdgeDirection.In) {
- graph.mapReduceTriplets(et => Iterator((et.dstId,1)), _ + _)
+ graph.aggregateMessages(_.sendToDst(1), _ + _, TripletFields.None)
} else if (edgeDirection == EdgeDirection.Out) {
- graph.mapReduceTriplets(et => Iterator((et.srcId,1)), _ + _)
+ graph.aggregateMessages(_.sendToSrc(1), _ + _, TripletFields.None)
} else { // EdgeDirection.Either
- graph.mapReduceTriplets(et => Iterator((et.srcId,1), (et.dstId,1)), _ + _)
+ graph.aggregateMessages(ctx => { ctx.sendToSrc(1); ctx.sendToDst(1) }, _ + _,
+ TripletFields.None)
}
}
@@ -88,18 +89,17 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = {
val nbrs =
if (edgeDirection == EdgeDirection.Either) {
- graph.mapReduceTriplets[Array[VertexId]](
- mapFunc = et => Iterator((et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))),
- reduceFunc = _ ++ _
- )
+ graph.aggregateMessages[Array[VertexId]](
+ ctx => { ctx.sendToSrc(Array(ctx.dstId)); ctx.sendToDst(Array(ctx.srcId)) },
+ _ ++ _, TripletFields.None)
} else if (edgeDirection == EdgeDirection.Out) {
- graph.mapReduceTriplets[Array[VertexId]](
- mapFunc = et => Iterator((et.srcId, Array(et.dstId))),
- reduceFunc = _ ++ _)
+ graph.aggregateMessages[Array[VertexId]](
+ ctx => ctx.sendToSrc(Array(ctx.dstId)),
+ _ ++ _, TripletFields.None)
} else if (edgeDirection == EdgeDirection.In) {
- graph.mapReduceTriplets[Array[VertexId]](
- mapFunc = et => Iterator((et.dstId, Array(et.srcId))),
- reduceFunc = _ ++ _)
+ graph.aggregateMessages[Array[VertexId]](
+ ctx => ctx.sendToDst(Array(ctx.srcId)),
+ _ ++ _, TripletFields.None)
} else {
throw new SparkException("It doesn't make sense to collect neighbor ids without a " +
"direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)")
@@ -122,22 +122,27 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
* @return the vertex set of neighboring vertex attributes for each vertex
*/
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = {
- val nbrs = graph.mapReduceTriplets[Array[(VertexId,VD)]](
- edge => {
- val msgToSrc = (edge.srcId, Array((edge.dstId, edge.dstAttr)))
- val msgToDst = (edge.dstId, Array((edge.srcId, edge.srcAttr)))
- edgeDirection match {
- case EdgeDirection.Either => Iterator(msgToSrc, msgToDst)
- case EdgeDirection.In => Iterator(msgToDst)
- case EdgeDirection.Out => Iterator(msgToSrc)
- case EdgeDirection.Both =>
- throw new SparkException("collectNeighbors does not support EdgeDirection.Both. Use" +
- "EdgeDirection.Either instead.")
- }
- },
- (a, b) => a ++ b)
-
- graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
+ val nbrs = edgeDirection match {
+ case EdgeDirection.Either =>
+ graph.aggregateMessages[Array[(VertexId,VD)]](
+ ctx => {
+ ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))
+ ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
+ },
+ (a, b) => a ++ b, TripletFields.SrcDstOnly)
+ case EdgeDirection.In =>
+ graph.aggregateMessages[Array[(VertexId,VD)]](
+ ctx => ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr))),
+ (a, b) => a ++ b, TripletFields.SrcOnly)
+ case EdgeDirection.Out =>
+ graph.aggregateMessages[Array[(VertexId,VD)]](
+ ctx => ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))),
+ (a, b) => a ++ b, TripletFields.DstOnly)
+ case EdgeDirection.Both =>
+ throw new SparkException("collectEdges does not support EdgeDirection.Both. Use" +
+ "EdgeDirection.Either instead.")
+ }
+ graph.vertices.leftJoin(nbrs) { (vid, vdata, nbrsOpt) =>
nbrsOpt.getOrElse(Array.empty[(VertexId, VD)])
}
} // end of collectNeighbor
@@ -160,18 +165,20 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
def collectEdges(edgeDirection: EdgeDirection): VertexRDD[Array[Edge[ED]]] = {
edgeDirection match {
case EdgeDirection.Either =>
- graph.mapReduceTriplets[Array[Edge[ED]]](
- edge => Iterator((edge.srcId, Array(new Edge(edge.srcId, edge.dstId, edge.attr))),
- (edge.dstId, Array(new Edge(edge.srcId, edge.dstId, edge.attr)))),
- (a, b) => a ++ b)
+ graph.aggregateMessages[Array[Edge[ED]]](
+ ctx => {
+ ctx.sendToSrc(Array(new Edge(ctx.srcId, ctx.dstId, ctx.attr)))
+ ctx.sendToDst(Array(new Edge(ctx.srcId, ctx.dstId, ctx.attr)))
+ },
+ (a, b) => a ++ b, TripletFields.EdgeOnly)
case EdgeDirection.In =>
- graph.mapReduceTriplets[Array[Edge[ED]]](
- edge => Iterator((edge.dstId, Array(new Edge(edge.srcId, edge.dstId, edge.attr)))),
- (a, b) => a ++ b)
+ graph.aggregateMessages[Array[Edge[ED]]](
+ ctx => ctx.sendToDst(Array(new Edge(ctx.srcId, ctx.dstId, ctx.attr))),
+ (a, b) => a ++ b, TripletFields.EdgeOnly)
case EdgeDirection.Out =>
- graph.mapReduceTriplets[Array[Edge[ED]]](
- edge => Iterator((edge.srcId, Array(new Edge(edge.srcId, edge.dstId, edge.attr)))),
- (a, b) => a ++ b)
+ graph.aggregateMessages[Array[Edge[ED]]](
+ ctx => ctx.sendToSrc(Array(new Edge(ctx.srcId, ctx.dstId, ctx.attr))),
+ (a, b) => a ++ b, TripletFields.EdgeOnly)
case EdgeDirection.Both =>
throw new SparkException("collectEdges does not support EdgeDirection.Both. Use" +
"EdgeDirection.Either instead.")
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java b/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java
new file mode 100644
index 0000000000..34df4b7ee7
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/TripletFields.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx;
+
+import java.io.Serializable;
+
+/**
+ * Represents a subset of the fields of an [[EdgeTriplet]] or [[EdgeContext]]. This allows the
+ * system to populate only those fields for efficiency.
+ */
+public class TripletFields implements Serializable {
+ public final boolean useSrc;
+ public final boolean useDst;
+ public final boolean useEdge;
+
+ public TripletFields() {
+ this(true, true, true);
+ }
+
+ public TripletFields(boolean useSrc, boolean useDst, boolean useEdge) {
+ this.useSrc = useSrc;
+ this.useDst = useDst;
+ this.useEdge = useEdge;
+ }
+
+ public static final TripletFields None = new TripletFields(false, false, false);
+ public static final TripletFields EdgeOnly = new TripletFields(false, false, true);
+ public static final TripletFields SrcOnly = new TripletFields(true, false, false);
+ public static final TripletFields DstOnly = new TripletFields(false, true, false);
+ public static final TripletFields SrcDstOnly = new TripletFields(true, true, false);
+ public static final TripletFields SrcAndEdge = new TripletFields(true, false, true);
+ public static final TripletFields Src = SrcAndEdge;
+ public static final TripletFields DstAndEdge = new TripletFields(false, true, true);
+ public static final TripletFields Dst = DstAndEdge;
+ public static final TripletFields All = new TripletFields(true, true, true);
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index a5c9cd1f8b..78d8ac24b5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -21,63 +21,93 @@ import scala.reflect.{classTag, ClassTag}
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.util.collection.BitSet
/**
- * A collection of edges stored in columnar format, along with any vertex attributes referenced. The
- * edges are stored in 3 large columnar arrays (src, dst, attribute). The arrays are clustered by
- * src. There is an optional active vertex set for filtering computation on the edges.
+ * A collection of edges, along with referenced vertex attributes and an optional active vertex set
+ * for filtering computation on the edges.
+ *
+ * The edges are stored in columnar format in `localSrcIds`, `localDstIds`, and `data`. All
+ * referenced global vertex ids are mapped to a compact set of local vertex ids according to the
+ * `global2local` map. Each local vertex id is a valid index into `vertexAttrs`, which stores the
+ * corresponding vertex attribute, and `local2global`, which stores the reverse mapping to global
+ * vertex id. The global vertex ids that are active are optionally stored in `activeSet`.
+ *
+ * The edges are clustered by source vertex id, and the mapping from global vertex id to the index
+ * of the corresponding edge cluster is stored in `index`.
*
* @tparam ED the edge attribute type
* @tparam VD the vertex attribute type
*
- * @param srcIds the source vertex id of each edge
- * @param dstIds the destination vertex id of each edge
+ * @param localSrcIds the local source vertex id of each edge as an index into `local2global` and
+ * `vertexAttrs`
+ * @param localDstIds the local destination vertex id of each edge as an index into `local2global`
+ * and `vertexAttrs`
* @param data the attribute associated with each edge
- * @param index a clustered index on source vertex id
- * @param vertices a map from referenced vertex ids to their corresponding attributes. Must
- * contain all vertex ids from `srcIds` and `dstIds`, though not necessarily valid attributes for
- * those vertex ids. The mask is not used.
+ * @param index a clustered index on source vertex id as a map from each global source vertex id to
+ * the offset in the edge arrays where the cluster for that vertex id begins
+ * @param global2local a map from referenced vertex ids to local ids which index into vertexAttrs
+ * @param local2global an array of global vertex ids where the offsets are local vertex ids
+ * @param vertexAttrs an array of vertex attributes where the offsets are local vertex ids
* @param activeSet an optional active vertex set for filtering computation on the edges
*/
private[graphx]
class EdgePartition[
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
- val srcIds: Array[VertexId] = null,
- val dstIds: Array[VertexId] = null,
- val data: Array[ED] = null,
- val index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int] = null,
- val vertices: VertexPartition[VD] = null,
- val activeSet: Option[VertexSet] = None
- ) extends Serializable {
+ localSrcIds: Array[Int],
+ localDstIds: Array[Int],
+ data: Array[ED],
+ index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int],
+ global2local: GraphXPrimitiveKeyOpenHashMap[VertexId, Int],
+ local2global: Array[VertexId],
+ vertexAttrs: Array[VD],
+ activeSet: Option[VertexSet])
+ extends Serializable {
- /** Return a new `EdgePartition` with the specified edge data. */
- def withData[ED2: ClassTag](data_ : Array[ED2]): EdgePartition[ED2, VD] = {
- new EdgePartition(srcIds, dstIds, data_, index, vertices, activeSet)
- }
+ private def this() = this(null, null, null, null, null, null, null, null)
- /** Return a new `EdgePartition` with the specified vertex partition. */
- def withVertices[VD2: ClassTag](
- vertices_ : VertexPartition[VD2]): EdgePartition[ED, VD2] = {
- new EdgePartition(srcIds, dstIds, data, index, vertices_, activeSet)
+ /** Return a new `EdgePartition` with the specified edge data. */
+ def withData[ED2: ClassTag](data: Array[ED2]): EdgePartition[ED2, VD] = {
+ new EdgePartition(
+ localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs, activeSet)
}
/** Return a new `EdgePartition` with the specified active set, provided as an iterator. */
def withActiveSet(iter: Iterator[VertexId]): EdgePartition[ED, VD] = {
- val newActiveSet = new VertexSet
- iter.foreach(newActiveSet.add(_))
- new EdgePartition(srcIds, dstIds, data, index, vertices, Some(newActiveSet))
- }
-
- /** Return a new `EdgePartition` with the specified active set. */
- def withActiveSet(activeSet_ : Option[VertexSet]): EdgePartition[ED, VD] = {
- new EdgePartition(srcIds, dstIds, data, index, vertices, activeSet_)
+ val activeSet = new VertexSet
+ while (iter.hasNext) { activeSet.add(iter.next()) }
+ new EdgePartition(
+ localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs,
+ Some(activeSet))
}
/** Return a new `EdgePartition` with updates to vertex attributes specified in `iter`. */
def updateVertices(iter: Iterator[(VertexId, VD)]): EdgePartition[ED, VD] = {
- this.withVertices(vertices.innerJoinKeepLeft(iter))
+ val newVertexAttrs = new Array[VD](vertexAttrs.length)
+ System.arraycopy(vertexAttrs, 0, newVertexAttrs, 0, vertexAttrs.length)
+ while (iter.hasNext) {
+ val kv = iter.next()
+ newVertexAttrs(global2local(kv._1)) = kv._2
+ }
+ new EdgePartition(
+ localSrcIds, localDstIds, data, index, global2local, local2global, newVertexAttrs,
+ activeSet)
+ }
+
+ /** Return a new `EdgePartition` without any locally cached vertex attributes. */
+ def withoutVertexAttributes[VD2: ClassTag](): EdgePartition[ED, VD2] = {
+ val newVertexAttrs = new Array[VD2](vertexAttrs.length)
+ new EdgePartition(
+ localSrcIds, localDstIds, data, index, global2local, local2global, newVertexAttrs,
+ activeSet)
}
+ @inline private def srcIds(pos: Int): VertexId = local2global(localSrcIds(pos))
+
+ @inline private def dstIds(pos: Int): VertexId = local2global(localDstIds(pos))
+
+ @inline private def attrs(pos: Int): ED = data(pos)
+
/** Look up vid in activeSet, throwing an exception if it is None. */
def isActive(vid: VertexId): Boolean = {
activeSet.get.contains(vid)
@@ -92,11 +122,19 @@ class EdgePartition[
* @return a new edge partition with all edges reversed.
*/
def reverse: EdgePartition[ED, VD] = {
- val builder = new EdgePartitionBuilder(size)(classTag[ED], classTag[VD])
- for (e <- iterator) {
- builder.add(e.dstId, e.srcId, e.attr)
+ val builder = new ExistingEdgePartitionBuilder[ED, VD](
+ global2local, local2global, vertexAttrs, activeSet, size)
+ var i = 0
+ while (i < size) {
+ val localSrcId = localSrcIds(i)
+ val localDstId = localDstIds(i)
+ val srcId = local2global(localSrcId)
+ val dstId = local2global(localDstId)
+ val attr = data(i)
+ builder.add(dstId, srcId, localDstId, localSrcId, attr)
+ i += 1
}
- builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
+ builder.toEdgePartition
}
/**
@@ -157,13 +195,25 @@ class EdgePartition[
def filter(
epred: EdgeTriplet[VD, ED] => Boolean,
vpred: (VertexId, VD) => Boolean): EdgePartition[ED, VD] = {
- val filtered = tripletIterator().filter(et =>
- vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et))
- val builder = new EdgePartitionBuilder[ED, VD]
- for (e <- filtered) {
- builder.add(e.srcId, e.dstId, e.attr)
+ val builder = new ExistingEdgePartitionBuilder[ED, VD](
+ global2local, local2global, vertexAttrs, activeSet)
+ var i = 0
+ while (i < size) {
+ // The user sees the EdgeTriplet, so we can't reuse it and must create one per edge.
+ val localSrcId = localSrcIds(i)
+ val localDstId = localDstIds(i)
+ val et = new EdgeTriplet[VD, ED]
+ et.srcId = local2global(localSrcId)
+ et.dstId = local2global(localDstId)
+ et.srcAttr = vertexAttrs(localSrcId)
+ et.dstAttr = vertexAttrs(localDstId)
+ et.attr = data(i)
+ if (vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)) {
+ builder.add(et.srcId, et.dstId, localSrcId, localDstId, et.attr)
+ }
+ i += 1
}
- builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
+ builder.toEdgePartition
}
/**
@@ -183,28 +233,40 @@ class EdgePartition[
* @return a new edge partition without duplicate edges
*/
def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED, VD] = {
- val builder = new EdgePartitionBuilder[ED, VD]
+ val builder = new ExistingEdgePartitionBuilder[ED, VD](
+ global2local, local2global, vertexAttrs, activeSet)
var currSrcId: VertexId = null.asInstanceOf[VertexId]
var currDstId: VertexId = null.asInstanceOf[VertexId]
+ var currLocalSrcId = -1
+ var currLocalDstId = -1
var currAttr: ED = null.asInstanceOf[ED]
+ // Iterate through the edges, accumulating runs of identical edges using the curr* variables and
+ // releasing them to the builder when we see the beginning of the next run
var i = 0
while (i < size) {
if (i > 0 && currSrcId == srcIds(i) && currDstId == dstIds(i)) {
+ // This edge should be accumulated into the existing run
currAttr = merge(currAttr, data(i))
} else {
+ // This edge starts a new run of edges
if (i > 0) {
- builder.add(currSrcId, currDstId, currAttr)
+ // First release the existing run to the builder
+ builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr)
}
+ // Then start accumulating for a new run
currSrcId = srcIds(i)
currDstId = dstIds(i)
+ currLocalSrcId = localSrcIds(i)
+ currLocalDstId = localDstIds(i)
currAttr = data(i)
}
i += 1
}
+ // Finally, release the last accumulated run
if (size > 0) {
- builder.add(currSrcId, currDstId, currAttr)
+ builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr)
}
- builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
+ builder.toEdgePartition
}
/**
@@ -220,7 +282,8 @@ class EdgePartition[
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgePartition[ED2, _])
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3, VD] = {
- val builder = new EdgePartitionBuilder[ED3, VD]
+ val builder = new ExistingEdgePartitionBuilder[ED3, VD](
+ global2local, local2global, vertexAttrs, activeSet)
var i = 0
var j = 0
// For i = index of each edge in `this`...
@@ -233,12 +296,13 @@ class EdgePartition[
while (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) < dstId) { j += 1 }
if (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) == dstId) {
// ... run `f` on the matching edge
- builder.add(srcId, dstId, f(srcId, dstId, this.data(i), other.data(j)))
+ builder.add(srcId, dstId, localSrcIds(i), localDstIds(i),
+ f(srcId, dstId, this.data(i), other.attrs(j)))
}
}
i += 1
}
- builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
+ builder.toEdgePartition
}
/**
@@ -246,7 +310,7 @@ class EdgePartition[
*
* @return size of the partition
*/
- val size: Int = srcIds.size
+ val size: Int = localSrcIds.size
/** The number of unique source vertices in the partition. */
def indexSize: Int = index.size
@@ -280,55 +344,197 @@ class EdgePartition[
* It is safe to keep references to the objects from this iterator.
*/
def tripletIterator(
- includeSrc: Boolean = true, includeDst: Boolean = true): Iterator[EdgeTriplet[VD, ED]] = {
- new EdgeTripletIterator(this, includeSrc, includeDst)
+ includeSrc: Boolean = true, includeDst: Boolean = true)
+ : Iterator[EdgeTriplet[VD, ED]] = new Iterator[EdgeTriplet[VD, ED]] {
+ private[this] var pos = 0
+
+ override def hasNext: Boolean = pos < EdgePartition.this.size
+
+ override def next() = {
+ val triplet = new EdgeTriplet[VD, ED]
+ val localSrcId = localSrcIds(pos)
+ val localDstId = localDstIds(pos)
+ triplet.srcId = local2global(localSrcId)
+ triplet.dstId = local2global(localDstId)
+ if (includeSrc) {
+ triplet.srcAttr = vertexAttrs(localSrcId)
+ }
+ if (includeDst) {
+ triplet.dstAttr = vertexAttrs(localDstId)
+ }
+ triplet.attr = data(pos)
+ pos += 1
+ triplet
+ }
}
/**
- * Upgrade the given edge iterator into a triplet iterator.
+ * Send messages along edges and aggregate them at the receiving vertices. Implemented by scanning
+ * all edges sequentially.
+ *
+ * @param sendMsg generates messages to neighboring vertices of an edge
+ * @param mergeMsg the combiner applied to messages destined to the same vertex
+ * @param tripletFields which triplet fields `sendMsg` uses
+ * @param srcMustBeActive if true, edges will only be considered if their source vertex is in the
+ * active set
+ * @param dstMustBeActive if true, edges will only be considered if their destination vertex is in
+ * the active set
+ * @param maySatisfyEither if true, only one vertex need be in the active set for an edge to be
+ * considered
*
- * Be careful not to keep references to the objects from this iterator. To improve GC performance
- * the same object is re-used in `next()`.
+ * @return iterator aggregated messages keyed by the receiving vertex id
*/
- def upgradeIterator(
- edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true, includeDst: Boolean = true)
- : Iterator[EdgeTriplet[VD, ED]] = {
- new ReusingEdgeTripletIterator(edgeIter, this, includeSrc, includeDst)
+ def aggregateMessagesEdgeScan[A: ClassTag](
+ sendMsg: EdgeContext[VD, ED, A] => Unit,
+ mergeMsg: (A, A) => A,
+ tripletFields: TripletFields,
+ srcMustBeActive: Boolean,
+ dstMustBeActive: Boolean,
+ maySatisfyEither: Boolean): Iterator[(VertexId, A)] = {
+ val aggregates = new Array[A](vertexAttrs.length)
+ val bitset = new BitSet(vertexAttrs.length)
+
+ var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset)
+ var i = 0
+ while (i < size) {
+ val localSrcId = localSrcIds(i)
+ val srcId = local2global(localSrcId)
+ val localDstId = localDstIds(i)
+ val dstId = local2global(localDstId)
+ val srcIsActive = !srcMustBeActive || isActive(srcId)
+ val dstIsActive = !dstMustBeActive || isActive(dstId)
+ val edgeIsActive =
+ if (maySatisfyEither) srcIsActive || dstIsActive else srcIsActive && dstIsActive
+ if (edgeIsActive) {
+ val srcAttr = if (tripletFields.useSrc) vertexAttrs(localSrcId) else null.asInstanceOf[VD]
+ val dstAttr = if (tripletFields.useDst) vertexAttrs(localDstId) else null.asInstanceOf[VD]
+ ctx.set(srcId, dstId, localSrcId, localDstId, srcAttr, dstAttr, data(i))
+ sendMsg(ctx)
+ }
+ i += 1
+ }
+
+ bitset.iterator.map { localId => (local2global(localId), aggregates(localId)) }
}
/**
- * Get an iterator over the edges in this partition whose source vertex ids match srcIdPred. The
- * iterator is generated using an index scan, so it is efficient at skipping edges that don't
- * match srcIdPred.
+ * Send messages along edges and aggregate them at the receiving vertices. Implemented by
+ * filtering the source vertex index, then scanning each edge cluster.
*
- * Be careful not to keep references to the objects from this iterator. To improve GC performance
- * the same object is re-used in `next()`.
- */
- def indexIterator(srcIdPred: VertexId => Boolean): Iterator[Edge[ED]] =
- index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))
-
- /**
- * Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The
- * cluster must start at position `index`.
+ * @param sendMsg generates messages to neighboring vertices of an edge
+ * @param mergeMsg the combiner applied to messages destined to the same vertex
+ * @param tripletFields which triplet fields `sendMsg` uses
+ * @param srcMustBeActive if true, edges will only be considered if their source vertex is in the
+ * active set
+ * @param dstMustBeActive if true, edges will only be considered if their destination vertex is in
+ * the active set
+ * @param maySatisfyEither if true, only one vertex need be in the active set for an edge to be
+ * considered
*
- * Be careful not to keep references to the objects from this iterator. To improve GC performance
- * the same object is re-used in `next()`.
+ * @return iterator aggregated messages keyed by the receiving vertex id
*/
- private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] {
- private[this] val edge = new Edge[ED]
- private[this] var pos = index
+ def aggregateMessagesIndexScan[A: ClassTag](
+ sendMsg: EdgeContext[VD, ED, A] => Unit,
+ mergeMsg: (A, A) => A,
+ tripletFields: TripletFields,
+ srcMustBeActive: Boolean,
+ dstMustBeActive: Boolean,
+ maySatisfyEither: Boolean): Iterator[(VertexId, A)] = {
+ val aggregates = new Array[A](vertexAttrs.length)
+ val bitset = new BitSet(vertexAttrs.length)
- override def hasNext: Boolean = {
- pos >= 0 && pos < EdgePartition.this.size && srcIds(pos) == srcId
+ var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset)
+ index.iterator.foreach { cluster =>
+ val clusterSrcId = cluster._1
+ val clusterPos = cluster._2
+ val clusterLocalSrcId = localSrcIds(clusterPos)
+ val srcIsActive = !srcMustBeActive || isActive(clusterSrcId)
+ if (srcIsActive || maySatisfyEither) {
+ var pos = clusterPos
+ val srcAttr =
+ if (tripletFields.useSrc) vertexAttrs(clusterLocalSrcId) else null.asInstanceOf[VD]
+ ctx.setSrcOnly(clusterSrcId, clusterLocalSrcId, srcAttr)
+ while (pos < size && localSrcIds(pos) == clusterLocalSrcId) {
+ val localDstId = localDstIds(pos)
+ val dstId = local2global(localDstId)
+ val dstIsActive = !dstMustBeActive || isActive(dstId)
+ val edgeIsActive =
+ if (maySatisfyEither) srcIsActive || dstIsActive else srcIsActive && dstIsActive
+ if (edgeIsActive) {
+ val dstAttr =
+ if (tripletFields.useDst) vertexAttrs(localDstId) else null.asInstanceOf[VD]
+ ctx.setRest(dstId, localDstId, dstAttr, data(pos))
+ sendMsg(ctx)
+ }
+ pos += 1
+ }
+ }
}
- override def next(): Edge[ED] = {
- assert(srcIds(pos) == srcId)
- edge.srcId = srcIds(pos)
- edge.dstId = dstIds(pos)
- edge.attr = data(pos)
- pos += 1
- edge
+ bitset.iterator.map { localId => (local2global(localId), aggregates(localId)) }
+ }
+}
+
+private class AggregatingEdgeContext[VD, ED, A](
+ mergeMsg: (A, A) => A,
+ aggregates: Array[A],
+ bitset: BitSet)
+ extends EdgeContext[VD, ED, A] {
+
+ private[this] var _srcId: VertexId = _
+ private[this] var _dstId: VertexId = _
+ private[this] var _localSrcId: Int = _
+ private[this] var _localDstId: Int = _
+ private[this] var _srcAttr: VD = _
+ private[this] var _dstAttr: VD = _
+ private[this] var _attr: ED = _
+
+ def set(
+ srcId: VertexId, dstId: VertexId,
+ localSrcId: Int, localDstId: Int,
+ srcAttr: VD, dstAttr: VD,
+ attr: ED) {
+ _srcId = srcId
+ _dstId = dstId
+ _localSrcId = localSrcId
+ _localDstId = localDstId
+ _srcAttr = srcAttr
+ _dstAttr = dstAttr
+ _attr = attr
+ }
+
+ def setSrcOnly(srcId: VertexId, localSrcId: Int, srcAttr: VD) {
+ _srcId = srcId
+ _localSrcId = localSrcId
+ _srcAttr = srcAttr
+ }
+
+ def setRest(dstId: VertexId, localDstId: Int, dstAttr: VD, attr: ED) {
+ _dstId = dstId
+ _localDstId = localDstId
+ _dstAttr = dstAttr
+ _attr = attr
+ }
+
+ override def srcId = _srcId
+ override def dstId = _dstId
+ override def srcAttr = _srcAttr
+ override def dstAttr = _dstAttr
+ override def attr = _attr
+
+ override def sendToSrc(msg: A) {
+ send(_localSrcId, msg)
+ }
+ override def sendToDst(msg: A) {
+ send(_localDstId, msg)
+ }
+
+ @inline private def send(localId: Int, msg: A) {
+ if (bitset.get(localId)) {
+ aggregates(localId) = mergeMsg(aggregates(localId), msg)
+ } else {
+ aggregates(localId) = msg
+ bitset.set(localId)
}
}
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index 2b6137be25..b0cb0fe47d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -25,10 +25,11 @@ import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+/** Constructs an EdgePartition from scratch. */
private[graphx]
class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
size: Int = 64) {
- var edges = new PrimitiveVector[Edge[ED]](size)
+ private[this] val edges = new PrimitiveVector[Edge[ED]](size)
/** Add a new edge to the partition. */
def add(src: VertexId, dst: VertexId, d: ED) {
@@ -38,8 +39,67 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla
def toEdgePartition: EdgePartition[ED, VD] = {
val edgeArray = edges.trim().array
Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
- val srcIds = new Array[VertexId](edgeArray.size)
- val dstIds = new Array[VertexId](edgeArray.size)
+ val localSrcIds = new Array[Int](edgeArray.size)
+ val localDstIds = new Array[Int](edgeArray.size)
+ val data = new Array[ED](edgeArray.size)
+ val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
+ val global2local = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
+ val local2global = new PrimitiveVector[VertexId]
+ var vertexAttrs = Array.empty[VD]
+ // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
+ // adding them to the index. Also populate a map from vertex id to a sequential local offset.
+ if (edgeArray.length > 0) {
+ index.update(edgeArray(0).srcId, 0)
+ var currSrcId: VertexId = edgeArray(0).srcId
+ var currLocalId = -1
+ var i = 0
+ while (i < edgeArray.size) {
+ val srcId = edgeArray(i).srcId
+ val dstId = edgeArray(i).dstId
+ localSrcIds(i) = global2local.changeValue(srcId,
+ { currLocalId += 1; local2global += srcId; currLocalId }, identity)
+ localDstIds(i) = global2local.changeValue(dstId,
+ { currLocalId += 1; local2global += dstId; currLocalId }, identity)
+ data(i) = edgeArray(i).attr
+ if (srcId != currSrcId) {
+ currSrcId = srcId
+ index.update(currSrcId, i)
+ }
+
+ i += 1
+ }
+ vertexAttrs = new Array[VD](currLocalId + 1)
+ }
+ new EdgePartition(
+ localSrcIds, localDstIds, data, index, global2local, local2global.trim().array, vertexAttrs,
+ None)
+ }
+}
+
+/**
+ * Constructs an EdgePartition from an existing EdgePartition with the same vertex set. This enables
+ * reuse of the local vertex ids. Intended for internal use in EdgePartition only.
+ */
+private[impl]
+class ExistingEdgePartitionBuilder[
+ @specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
+ global2local: GraphXPrimitiveKeyOpenHashMap[VertexId, Int],
+ local2global: Array[VertexId],
+ vertexAttrs: Array[VD],
+ activeSet: Option[VertexSet],
+ size: Int = 64) {
+ private[this] val edges = new PrimitiveVector[EdgeWithLocalIds[ED]](size)
+
+ /** Add a new edge to the partition. */
+ def add(src: VertexId, dst: VertexId, localSrc: Int, localDst: Int, d: ED) {
+ edges += EdgeWithLocalIds(src, dst, localSrc, localDst, d)
+ }
+
+ def toEdgePartition: EdgePartition[ED, VD] = {
+ val edgeArray = edges.trim().array
+ Sorting.quickSort(edgeArray)(EdgeWithLocalIds.lexicographicOrdering)
+ val localSrcIds = new Array[Int](edgeArray.size)
+ val localDstIds = new Array[Int](edgeArray.size)
val data = new Array[ED](edgeArray.size)
val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
// Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
@@ -49,8 +109,8 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla
var currSrcId: VertexId = edgeArray(0).srcId
var i = 0
while (i < edgeArray.size) {
- srcIds(i) = edgeArray(i).srcId
- dstIds(i) = edgeArray(i).dstId
+ localSrcIds(i) = edgeArray(i).localSrcId
+ localDstIds(i) = edgeArray(i).localDstId
data(i) = edgeArray(i).attr
if (edgeArray(i).srcId != currSrcId) {
currSrcId = edgeArray(i).srcId
@@ -60,13 +120,24 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla
}
}
- // Create and populate a VertexPartition with vids from the edges, but no attributes
- val vidsIter = srcIds.iterator ++ dstIds.iterator
- val vertexIds = new OpenHashSet[VertexId]
- vidsIter.foreach(vid => vertexIds.add(vid))
- val vertices = new VertexPartition(
- vertexIds, new Array[VD](vertexIds.capacity), vertexIds.getBitSet)
+ new EdgePartition(
+ localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs, activeSet)
+ }
+}
- new EdgePartition(srcIds, dstIds, data, index, vertices)
+private[impl] case class EdgeWithLocalIds[@specialized ED](
+ srcId: VertexId, dstId: VertexId, localSrcId: Int, localDstId: Int, attr: ED)
+
+private[impl] object EdgeWithLocalIds {
+ implicit def lexicographicOrdering[ED] = new Ordering[EdgeWithLocalIds[ED]] {
+ override def compare(a: EdgeWithLocalIds[ED], b: EdgeWithLocalIds[ED]): Int = {
+ if (a.srcId == b.srcId) {
+ if (a.dstId == b.dstId) 0
+ else if (a.dstId < b.dstId) -1
+ else 1
+ } else if (a.srcId < b.srcId) -1
+ else 1
+ }
}
+
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
deleted file mode 100644
index 56f79a7097..0000000000
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.graphx.impl
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
-
-/**
- * The Iterator type returned when constructing edge triplets. This could be an anonymous class in
- * EdgePartition.tripletIterator, but we name it here explicitly so it is easier to debug / profile.
- */
-private[impl]
-class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
- val edgePartition: EdgePartition[ED, VD],
- val includeSrc: Boolean,
- val includeDst: Boolean)
- extends Iterator[EdgeTriplet[VD, ED]] {
-
- // Current position in the array.
- private var pos = 0
-
- override def hasNext: Boolean = pos < edgePartition.size
-
- override def next() = {
- val triplet = new EdgeTriplet[VD, ED]
- triplet.srcId = edgePartition.srcIds(pos)
- if (includeSrc) {
- triplet.srcAttr = edgePartition.vertices(triplet.srcId)
- }
- triplet.dstId = edgePartition.dstIds(pos)
- if (includeDst) {
- triplet.dstAttr = edgePartition.vertices(triplet.dstId)
- }
- triplet.attr = edgePartition.data(pos)
- pos += 1
- triplet
- }
-}
-
-/**
- * An Iterator type for internal use that reuses EdgeTriplet objects. This could be an anonymous
- * class in EdgePartition.upgradeIterator, but we name it here explicitly so it is easier to debug /
- * profile.
- */
-private[impl]
-class ReusingEdgeTripletIterator[VD: ClassTag, ED: ClassTag](
- val edgeIter: Iterator[Edge[ED]],
- val edgePartition: EdgePartition[ED, VD],
- val includeSrc: Boolean,
- val includeDst: Boolean)
- extends Iterator[EdgeTriplet[VD, ED]] {
-
- private val triplet = new EdgeTriplet[VD, ED]
-
- override def hasNext = edgeIter.hasNext
-
- override def next() = {
- triplet.set(edgeIter.next())
- if (includeSrc) {
- triplet.srcAttr = edgePartition.vertices(triplet.srcId)
- }
- if (includeDst) {
- triplet.dstAttr = edgePartition.vertices(triplet.dstId)
- }
- triplet
- }
-}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 33f35cfb69..a1fe158b7b 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -23,7 +23,6 @@ import org.apache.spark.HashPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.storage.StorageLevel
-
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl.GraphImpl._
import org.apache.spark.graphx.util.BytecodeUtils
@@ -127,13 +126,12 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
}
override def mapTriplets[ED2: ClassTag](
- f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
+ f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2],
+ tripletFields: TripletFields): Graph[VD, ED2] = {
vertices.cache()
- val mapUsesSrcAttr = accessesVertexAttr(f, "srcAttr")
- val mapUsesDstAttr = accessesVertexAttr(f, "dstAttr")
- replicatedVertexView.upgrade(vertices, mapUsesSrcAttr, mapUsesDstAttr)
+ replicatedVertexView.upgrade(vertices, tripletFields.useSrc, tripletFields.useDst)
val newEdges = replicatedVertexView.edges.mapEdgePartitions { (pid, part) =>
- part.map(f(pid, part.tripletIterator(mapUsesSrcAttr, mapUsesDstAttr)))
+ part.map(f(pid, part.tripletIterator(tripletFields.useSrc, tripletFields.useDst)))
}
new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
}
@@ -171,15 +169,38 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def mapReduceTriplets[A: ClassTag](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
- activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None): VertexRDD[A] = {
+ activeSetOpt: Option[(VertexRDD[_], EdgeDirection)]): VertexRDD[A] = {
+
+ def sendMsg(ctx: EdgeContext[VD, ED, A]) {
+ mapFunc(ctx.toEdgeTriplet).foreach { kv =>
+ val id = kv._1
+ val msg = kv._2
+ if (id == ctx.srcId) {
+ ctx.sendToSrc(msg)
+ } else {
+ assert(id == ctx.dstId)
+ ctx.sendToDst(msg)
+ }
+ }
+ }
- vertices.cache()
+ val mapUsesSrcAttr = accessesVertexAttr(mapFunc, "srcAttr")
+ val mapUsesDstAttr = accessesVertexAttr(mapFunc, "dstAttr")
+ val tripletFields = new TripletFields(mapUsesSrcAttr, mapUsesDstAttr, true)
+
+ aggregateMessagesWithActiveSet(sendMsg, reduceFunc, tripletFields, activeSetOpt)
+ }
+
+ override def aggregateMessagesWithActiveSet[A: ClassTag](
+ sendMsg: EdgeContext[VD, ED, A] => Unit,
+ mergeMsg: (A, A) => A,
+ tripletFields: TripletFields,
+ activeSetOpt: Option[(VertexRDD[_], EdgeDirection)]): VertexRDD[A] = {
+ vertices.cache()
// For each vertex, replicate its attribute only to partitions where it is
// in the relevant position in an edge.
- val mapUsesSrcAttr = accessesVertexAttr(mapFunc, "srcAttr")
- val mapUsesDstAttr = accessesVertexAttr(mapFunc, "dstAttr")
- replicatedVertexView.upgrade(vertices, mapUsesSrcAttr, mapUsesDstAttr)
+ replicatedVertexView.upgrade(vertices, tripletFields.useSrc, tripletFields.useDst)
val view = activeSetOpt match {
case Some((activeSet, _)) =>
replicatedVertexView.withActiveSet(activeSet)
@@ -193,42 +214,40 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
case (pid, edgePartition) =>
// Choose scan method
val activeFraction = edgePartition.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
- val edgeIter = activeDirectionOpt match {
+ activeDirectionOpt match {
case Some(EdgeDirection.Both) =>
if (activeFraction < 0.8) {
- edgePartition.indexIterator(srcVertexId => edgePartition.isActive(srcVertexId))
- .filter(e => edgePartition.isActive(e.dstId))
+ edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields,
+ true, true, false)
} else {
- edgePartition.iterator.filter(e =>
- edgePartition.isActive(e.srcId) && edgePartition.isActive(e.dstId))
+ edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
+ true, true, false)
}
case Some(EdgeDirection.Either) =>
// TODO: Because we only have a clustered index on the source vertex ID, we can't filter
// the index here. Instead we have to scan all edges and then do the filter.
- edgePartition.iterator.filter(e =>
- edgePartition.isActive(e.srcId) || edgePartition.isActive(e.dstId))
+ edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
+ true, true, true)
case Some(EdgeDirection.Out) =>
if (activeFraction < 0.8) {
- edgePartition.indexIterator(srcVertexId => edgePartition.isActive(srcVertexId))
+ edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields,
+ true, false, false)
} else {
- edgePartition.iterator.filter(e => edgePartition.isActive(e.srcId))
+ edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
+ true, false, false)
}
case Some(EdgeDirection.In) =>
- edgePartition.iterator.filter(e => edgePartition.isActive(e.dstId))
+ edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
+ false, true, false)
case _ => // None
- edgePartition.iterator
+ edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
+ false, false, false)
}
-
- // Scan edges and run the map function
- val mapOutputs = edgePartition.upgradeIterator(edgeIter, mapUsesSrcAttr, mapUsesDstAttr)
- .flatMap(mapFunc(_))
- // Note: This doesn't allow users to send messages to arbitrary vertices.
- edgePartition.vertices.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
- }).setName("GraphImpl.mapReduceTriplets - preAgg")
+ }).setName("GraphImpl.aggregateMessages - preAgg")
// do the final reduction reusing the index map
- vertices.aggregateUsingIndex(preAgg, reduceFunc)
- } // end of mapReduceTriplets
+ vertices.aggregateUsingIndex(preAgg, mergeMsg)
+ }
override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
(other: RDD[(VertexId, U)])
@@ -306,9 +325,7 @@ object GraphImpl {
vertices: VertexRDD[VD],
edges: EdgeRDD[ED, _]): GraphImpl[VD, ED] = {
// Convert the vertex partitions in edges to the correct type
- val newEdges = edges.mapEdgePartitions(
- (pid, part) => part.withVertices(part.vertices.map(
- (vid, attr) => null.asInstanceOf[VD])))
+ val newEdges = edges.mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
GraphImpl.fromExistingRDDs(vertices, newEdges)
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
index 7a7fa91aad..eb3c997e0f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
@@ -56,11 +56,9 @@ object RoutingTablePartition {
// Determine which positions each vertex id appears in using a map where the low 2 bits
// represent src and dst
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, Byte]
- edgePartition.srcIds.iterator.foreach { srcId =>
- map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
- }
- edgePartition.dstIds.iterator.foreach { dstId =>
- map.changeValue(dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
+ edgePartition.iterator.foreach { e =>
+ map.changeValue(e.srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
+ map.changeValue(e.dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
}
map.iterator.map { vidAndPosition =>
val vid = vidAndPosition._1
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 257e2f3a36..e40ae0d615 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -85,7 +85,7 @@ object PageRank extends Logging {
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
// Set the weight on the edges based on the degree
- .mapTriplets( e => 1.0 / e.srcAttr )
+ .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.SrcOnly )
// Set the vertex attributes to the initial pagerank values
.mapVertices( (id, attr) => resetProb )
@@ -96,8 +96,8 @@ object PageRank extends Logging {
// Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
// do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
- val rankUpdates = rankGraph.mapReduceTriplets[Double](
- e => Iterator((e.dstId, e.srcAttr * e.attr)), _ + _)
+ val rankUpdates = rankGraph.aggregateMessages[Double](
+ ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.SrcAndEdge)
// Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
// that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index ccd7de537b..f58587e10a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -74,9 +74,9 @@ object SVDPlusPlus {
var g = Graph.fromEdges(edges, defaultF(conf.rank)).cache()
// Calculate initial bias and norm
- val t0 = g.mapReduceTriplets(
- et => Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))),
- (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2))
+ val t0 = g.aggregateMessages[(Long, Double)](
+ ctx => { ctx.sendToSrc((1L, ctx.attr)); ctx.sendToDst((1L, ctx.attr)) },
+ (g1, g2) => (g1._1 + g2._1, g1._2 + g2._2))
g = g.outerJoinVertices(t0) {
(vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double),
@@ -84,15 +84,17 @@ object SVDPlusPlus {
(vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
}
- def mapTrainF(conf: Conf, u: Double)
- (et: EdgeTriplet[(DoubleMatrix, DoubleMatrix, Double, Double), Double])
- : Iterator[(VertexId, (DoubleMatrix, DoubleMatrix, Double))] = {
- val (usr, itm) = (et.srcAttr, et.dstAttr)
+ def sendMsgTrainF(conf: Conf, u: Double)
+ (ctx: EdgeContext[
+ (DoubleMatrix, DoubleMatrix, Double, Double),
+ Double,
+ (DoubleMatrix, DoubleMatrix, Double)]) {
+ val (usr, itm) = (ctx.srcAttr, ctx.dstAttr)
val (p, q) = (usr._1, itm._1)
var pred = u + usr._3 + itm._3 + q.dot(usr._2)
pred = math.max(pred, conf.minVal)
pred = math.min(pred, conf.maxVal)
- val err = et.attr - pred
+ val err = ctx.attr - pred
val updateP = q.mul(err)
.subColumnVector(p.mul(conf.gamma7))
.mul(conf.gamma2)
@@ -102,16 +104,16 @@ object SVDPlusPlus {
val updateY = q.mul(err * usr._4)
.subColumnVector(itm._2.mul(conf.gamma7))
.mul(conf.gamma2)
- Iterator((et.srcId, (updateP, updateY, (err - conf.gamma6 * usr._3) * conf.gamma1)),
- (et.dstId, (updateQ, updateY, (err - conf.gamma6 * itm._3) * conf.gamma1)))
+ ctx.sendToSrc((updateP, updateY, (err - conf.gamma6 * usr._3) * conf.gamma1))
+ ctx.sendToDst((updateQ, updateY, (err - conf.gamma6 * itm._3) * conf.gamma1))
}
for (i <- 0 until conf.maxIters) {
// Phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes
g.cache()
- val t1 = g.mapReduceTriplets(
- et => Iterator((et.srcId, et.dstAttr._2)),
- (g1: DoubleMatrix, g2: DoubleMatrix) => g1.addColumnVector(g2))
+ val t1 = g.aggregateMessages[DoubleMatrix](
+ ctx => ctx.sendToSrc(ctx.dstAttr._2),
+ (g1, g2) => g1.addColumnVector(g2))
g = g.outerJoinVertices(t1) {
(vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double),
msg: Option[DoubleMatrix]) =>
@@ -121,8 +123,8 @@ object SVDPlusPlus {
// Phase 2, update p for user nodes and q, y for item nodes
g.cache()
- val t2 = g.mapReduceTriplets(
- mapTrainF(conf, u),
+ val t2 = g.aggregateMessages(
+ sendMsgTrainF(conf, u),
(g1: (DoubleMatrix, DoubleMatrix, Double), g2: (DoubleMatrix, DoubleMatrix, Double)) =>
(g1._1.addColumnVector(g2._1), g1._2.addColumnVector(g2._2), g1._3 + g2._3))
g = g.outerJoinVertices(t2) {
@@ -135,20 +137,18 @@ object SVDPlusPlus {
}
// calculate error on training set
- def mapTestF(conf: Conf, u: Double)
- (et: EdgeTriplet[(DoubleMatrix, DoubleMatrix, Double, Double), Double])
- : Iterator[(VertexId, Double)] =
- {
- val (usr, itm) = (et.srcAttr, et.dstAttr)
+ def sendMsgTestF(conf: Conf, u: Double)
+ (ctx: EdgeContext[(DoubleMatrix, DoubleMatrix, Double, Double), Double, Double]) {
+ val (usr, itm) = (ctx.srcAttr, ctx.dstAttr)
val (p, q) = (usr._1, itm._1)
var pred = u + usr._3 + itm._3 + q.dot(usr._2)
pred = math.max(pred, conf.minVal)
pred = math.min(pred, conf.maxVal)
- val err = (et.attr - pred) * (et.attr - pred)
- Iterator((et.dstId, err))
+ val err = (ctx.attr - pred) * (ctx.attr - pred)
+ ctx.sendToDst(err)
}
g.cache()
- val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2)
+ val t3 = g.aggregateMessages[Double](sendMsgTestF(conf, u), _ + _)
g = g.outerJoinVertices(t3) {
(vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double), msg: Option[Double]) =>
if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
index 7c396e6e66..daf162085e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
@@ -61,26 +61,27 @@ object TriangleCount {
(vid, _, optSet) => optSet.getOrElse(null)
}
// Edge function computes intersection of smaller vertex with larger vertex
- def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexId, Int)] = {
- assert(et.srcAttr != null)
- assert(et.dstAttr != null)
- val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) {
- (et.srcAttr, et.dstAttr)
+ def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) {
+ assert(ctx.srcAttr != null)
+ assert(ctx.dstAttr != null)
+ val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) {
+ (ctx.srcAttr, ctx.dstAttr)
} else {
- (et.dstAttr, et.srcAttr)
+ (ctx.dstAttr, ctx.srcAttr)
}
val iter = smallSet.iterator
var counter: Int = 0
while (iter.hasNext) {
val vid = iter.next()
- if (vid != et.srcId && vid != et.dstId && largeSet.contains(vid)) {
+ if (vid != ctx.srcId && vid != ctx.dstId && largeSet.contains(vid)) {
counter += 1
}
}
- Iterator((et.srcId, counter), (et.dstId, counter))
+ ctx.sendToSrc(counter)
+ ctx.sendToDst(counter)
}
// compute the intersection along edges
- val counters: VertexRDD[Int] = setGraph.mapReduceTriplets(edgeFunc, _ + _)
+ val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _)
// Merge counters with the graph and divide by two since each triangle is counted twice
g.outerJoinVertices(counters) {
(vid, _, optCounter: Option[Int]) =>
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 6506bac73d..df773db6e4 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -118,7 +118,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
// Each vertex should be replicated to at most 2 * sqrt(p) partitions
val partitionSets = partitionedGraph.edges.partitionsRDD.mapPartitions { iter =>
val part = iter.next()._2
- Iterator((part.srcIds ++ part.dstIds).toSet)
+ Iterator((part.iterator.flatMap(e => Iterator(e.srcId, e.dstId))).toSet)
}.collect
if (!verts.forall(id => partitionSets.count(_.contains(id)) <= bound)) {
val numFailures = verts.count(id => partitionSets.count(_.contains(id)) > bound)
@@ -130,7 +130,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
// This should not be true for the default hash partitioning
val partitionSetsUnpartitioned = graph.edges.partitionsRDD.mapPartitions { iter =>
val part = iter.next()._2
- Iterator((part.srcIds ++ part.dstIds).toSet)
+ Iterator((part.iterator.flatMap(e => Iterator(e.srcId, e.dstId))).toSet)
}.collect
assert(verts.exists(id => partitionSetsUnpartitioned.count(_.contains(id)) > bound))
@@ -318,6 +318,21 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
+ test("aggregateMessages") {
+ withSpark { sc =>
+ val n = 5
+ val agg = starGraph(sc, n).aggregateMessages[String](
+ ctx => {
+ if (ctx.dstAttr != null) {
+ throw new Exception(
+ "expected ctx.dstAttr to be null due to TripletFields, but it was " + ctx.dstAttr)
+ }
+ ctx.sendToDst(ctx.srcAttr)
+ }, _ + _, TripletFields.SrcOnly)
+ assert(agg.collect().toSet === (1 to n).map(x => (x: VertexId, "v")).toSet)
+ }
+ }
+
test("outerJoinVertices") {
withSpark { sc =>
val n = 5
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
index db1dac6160..515f3a9cd0 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
@@ -82,29 +82,6 @@ class EdgePartitionSuite extends FunSuite {
assert(edgePartition.groupEdges(_ + _).iterator.map(_.copy()).toList === groupedEdges)
}
- test("upgradeIterator") {
- val edges = List((0, 1, 0), (1, 0, 0))
- val verts = List((0L, 1), (1L, 2))
- val part = makeEdgePartition(edges).updateVertices(verts.iterator)
- assert(part.upgradeIterator(part.iterator).map(_.toTuple).toList ===
- part.tripletIterator().toList.map(_.toTuple))
- }
-
- test("indexIterator") {
- val edgesFrom0 = List(Edge(0, 1, 0))
- val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0))
- val sortedEdges = edgesFrom0 ++ edgesFrom1
- val builder = new EdgePartitionBuilder[Int, Nothing]
- for (e <- Random.shuffle(sortedEdges)) {
- builder.add(e.srcId, e.dstId, e.attr)
- }
-
- val edgePartition = builder.toEdgePartition
- assert(edgePartition.iterator.map(_.copy()).toList === sortedEdges)
- assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0)
- assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1)
- }
-
test("innerJoin") {
val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0))
@@ -125,8 +102,18 @@ class EdgePartitionSuite extends FunSuite {
assert(ep.numActives == Some(2))
}
+ test("tripletIterator") {
+ val builder = new EdgePartitionBuilder[Int, Int]
+ builder.add(1, 2, 0)
+ builder.add(1, 3, 0)
+ builder.add(1, 4, 0)
+ val ep = builder.toEdgePartition
+ val result = ep.tripletIterator().toList.map(et => (et.srcId, et.dstId))
+ assert(result === Seq((1, 2), (1, 3), (1, 4)))
+ }
+
test("serialization") {
- val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
+ val aList = List((0, 1, 1), (1, 0, 2), (1, 2, 3), (5, 4, 4), (5, 5, 5))
val a: EdgePartition[Int, Int] = makeEdgePartition(aList)
val javaSer = new JavaSerializer(new SparkConf())
val conf = new SparkConf()
@@ -135,11 +122,7 @@ class EdgePartitionSuite extends FunSuite {
for (ser <- List(javaSer, kryoSer); s = ser.newInstance()) {
val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a))
- assert(aSer.srcIds.toList === a.srcIds.toList)
- assert(aSer.dstIds.toList === a.dstIds.toList)
- assert(aSer.data.toList === a.data.toList)
- assert(aSer.index != null)
- assert(aSer.vertices.iterator.toSet === a.vertices.iterator.toSet)
+ assert(aSer.tripletIterator().toList === a.tripletIterator().toList)
}
}
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
deleted file mode 100644
index 49b2704390..0000000000
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.graphx.impl
-
-import scala.reflect.ClassTag
-import scala.util.Random
-
-import org.scalatest.FunSuite
-
-import org.apache.spark.graphx._
-
-class EdgeTripletIteratorSuite extends FunSuite {
- test("iterator.toList") {
- val builder = new EdgePartitionBuilder[Int, Int]
- builder.add(1, 2, 0)
- builder.add(1, 3, 0)
- builder.add(1, 4, 0)
- val iter = new EdgeTripletIterator[Int, Int](builder.toEdgePartition, true, true)
- val result = iter.toList.map(et => (et.srcId, et.dstId))
- assert(result === Seq((1, 2), (1, 3), (1, 4)))
- }
-}