aboutsummaryrefslogtreecommitdiff
path: root/graphx/src/test
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-11-11 23:38:27 -0800
committerReynold Xin <rxin@databricks.com>2014-11-11 23:38:27 -0800
commitfaeb41de215d3ac567ce72a43ab242ad433ca93e (patch)
tree36f408ec2e7a014ff07a2337e6939eb67ee7387c /graphx/src/test
parent2ef016b130a48869cf81fe6cf147ef2b1e79d674 (diff)
downloadspark-faeb41de215d3ac567ce72a43ab242ad433ca93e.tar.gz
spark-faeb41de215d3ac567ce72a43ab242ad433ca93e.tar.bz2
spark-faeb41de215d3ac567ce72a43ab242ad433ca93e.zip
[SPARK-3936] Add aggregateMessages, which supersedes mapReduceTriplets
aggregateMessages enables neighborhood computation similarly to mapReduceTriplets, but it introduces two API improvements: 1. Messages are sent using an imperative interface based on EdgeContext rather than by returning an iterator of messages. 2. Rather than attempting bytecode inspection, the required triplet fields must be explicitly specified by the user by passing a TripletFields object. This fixes SPARK-3936. Additionally, this PR includes the following optimizations for aggregateMessages and EdgePartition: 1. EdgePartition now stores local vertex ids instead of global ids. This avoids hash lookups when looking up vertex attributes and aggregating messages. 2. Internal iterators in aggregateMessages are inlined into a while loop. In total, these optimizations were tested to provide a 37% speedup on PageRank (uk-2007-05 graph, 10 iterations, 16 r3.2xlarge machines, sped up from 513 s to 322 s). Subsumes apache/spark#2815. Also fixes SPARK-4173. Author: Ankur Dave <ankurdave@gmail.com> Closes #3100 from ankurdave/aggregateMessages and squashes the following commits: f5b65d0 [Ankur Dave] Address @rxin comments on apache/spark#3054 and apache/spark#3100 1e80aca [Ankur Dave] Add aggregateMessages, which supersedes mapReduceTriplets 194a2df [Ankur Dave] Test triplet iterator in EdgePartition serialization test e0f8ecc [Ankur Dave] Take activeSet in ExistingEdgePartitionBuilder c85076d [Ankur Dave] Readability improvements b567be2 [Ankur Dave] iter.foreach -> while loop 4a566dc [Ankur Dave] Optimizations for mapReduceTriplets and EdgePartition
Diffstat (limited to 'graphx/src/test')
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala19
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala41
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala37
3 files changed, 29 insertions, 68 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 6506bac73d..df773db6e4 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -118,7 +118,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
// Each vertex should be replicated to at most 2 * sqrt(p) partitions
val partitionSets = partitionedGraph.edges.partitionsRDD.mapPartitions { iter =>
val part = iter.next()._2
- Iterator((part.srcIds ++ part.dstIds).toSet)
+ Iterator((part.iterator.flatMap(e => Iterator(e.srcId, e.dstId))).toSet)
}.collect
if (!verts.forall(id => partitionSets.count(_.contains(id)) <= bound)) {
val numFailures = verts.count(id => partitionSets.count(_.contains(id)) > bound)
@@ -130,7 +130,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
// This should not be true for the default hash partitioning
val partitionSetsUnpartitioned = graph.edges.partitionsRDD.mapPartitions { iter =>
val part = iter.next()._2
- Iterator((part.srcIds ++ part.dstIds).toSet)
+ Iterator((part.iterator.flatMap(e => Iterator(e.srcId, e.dstId))).toSet)
}.collect
assert(verts.exists(id => partitionSetsUnpartitioned.count(_.contains(id)) > bound))
@@ -318,6 +318,21 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
+ test("aggregateMessages") {
+ withSpark { sc =>
+ val n = 5
+ val agg = starGraph(sc, n).aggregateMessages[String](
+ ctx => {
+ if (ctx.dstAttr != null) {
+ throw new Exception(
+ "expected ctx.dstAttr to be null due to TripletFields, but it was " + ctx.dstAttr)
+ }
+ ctx.sendToDst(ctx.srcAttr)
+ }, _ + _, TripletFields.SrcOnly)
+ assert(agg.collect().toSet === (1 to n).map(x => (x: VertexId, "v")).toSet)
+ }
+ }
+
test("outerJoinVertices") {
withSpark { sc =>
val n = 5
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
index db1dac6160..515f3a9cd0 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
@@ -82,29 +82,6 @@ class EdgePartitionSuite extends FunSuite {
assert(edgePartition.groupEdges(_ + _).iterator.map(_.copy()).toList === groupedEdges)
}
- test("upgradeIterator") {
- val edges = List((0, 1, 0), (1, 0, 0))
- val verts = List((0L, 1), (1L, 2))
- val part = makeEdgePartition(edges).updateVertices(verts.iterator)
- assert(part.upgradeIterator(part.iterator).map(_.toTuple).toList ===
- part.tripletIterator().toList.map(_.toTuple))
- }
-
- test("indexIterator") {
- val edgesFrom0 = List(Edge(0, 1, 0))
- val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0))
- val sortedEdges = edgesFrom0 ++ edgesFrom1
- val builder = new EdgePartitionBuilder[Int, Nothing]
- for (e <- Random.shuffle(sortedEdges)) {
- builder.add(e.srcId, e.dstId, e.attr)
- }
-
- val edgePartition = builder.toEdgePartition
- assert(edgePartition.iterator.map(_.copy()).toList === sortedEdges)
- assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0)
- assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1)
- }
-
test("innerJoin") {
val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0))
@@ -125,8 +102,18 @@ class EdgePartitionSuite extends FunSuite {
assert(ep.numActives == Some(2))
}
+ test("tripletIterator") {
+ val builder = new EdgePartitionBuilder[Int, Int]
+ builder.add(1, 2, 0)
+ builder.add(1, 3, 0)
+ builder.add(1, 4, 0)
+ val ep = builder.toEdgePartition
+ val result = ep.tripletIterator().toList.map(et => (et.srcId, et.dstId))
+ assert(result === Seq((1, 2), (1, 3), (1, 4)))
+ }
+
test("serialization") {
- val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
+ val aList = List((0, 1, 1), (1, 0, 2), (1, 2, 3), (5, 4, 4), (5, 5, 5))
val a: EdgePartition[Int, Int] = makeEdgePartition(aList)
val javaSer = new JavaSerializer(new SparkConf())
val conf = new SparkConf()
@@ -135,11 +122,7 @@ class EdgePartitionSuite extends FunSuite {
for (ser <- List(javaSer, kryoSer); s = ser.newInstance()) {
val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a))
- assert(aSer.srcIds.toList === a.srcIds.toList)
- assert(aSer.dstIds.toList === a.dstIds.toList)
- assert(aSer.data.toList === a.data.toList)
- assert(aSer.index != null)
- assert(aSer.vertices.iterator.toSet === a.vertices.iterator.toSet)
+ assert(aSer.tripletIterator().toList === a.tripletIterator().toList)
}
}
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
deleted file mode 100644
index 49b2704390..0000000000
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.graphx.impl
-
-import scala.reflect.ClassTag
-import scala.util.Random
-
-import org.scalatest.FunSuite
-
-import org.apache.spark.graphx._
-
-class EdgeTripletIteratorSuite extends FunSuite {
- test("iterator.toList") {
- val builder = new EdgePartitionBuilder[Int, Int]
- builder.add(1, 2, 0)
- builder.add(1, 3, 0)
- builder.add(1, 4, 0)
- val iter = new EdgeTripletIterator[Int, Int](builder.toEdgePartition, true, true)
- val result = iter.toList.map(et => (et.srcId, et.dstId))
- assert(result === Seq((1, 2), (1, 3), (1, 4)))
- }
-}