diff options
author | Ankur Dave <ankurdave@gmail.com> | 2014-07-22 22:18:30 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-07-22 22:18:30 -0700 |
commit | 6c2be93f081f33e9e97e1231b0084a6a0eb4fa22 (patch) | |
tree | 351799634cd34b39d2d0263395fbfb00d6434d56 /graphx/src/test | |
parent | 02e45729472a22a31629cdd17dc3836ba6810189 (diff) | |
download | spark-6c2be93f081f33e9e97e1231b0084a6a0eb4fa22.tar.gz spark-6c2be93f081f33e9e97e1231b0084a6a0eb4fa22.tar.bz2 spark-6c2be93f081f33e9e97e1231b0084a6a0eb4fa22.zip |
Remove GraphX MessageToPartition for compatibility with sort-based shuffle
MessageToPartition was used in `Graph#partitionBy`. Unlike a Tuple2, it marked the key as transient to avoid sending it over the network. However, it was incompatible with sort-based shuffle (SPARK-2045) and represented only a minor optimization: for partitionBy, it improved performance by 6.3% (30.4 s to 28.5 s) and reduced communication by 5.6% (114.2 MB to 107.8 MB).
Author: Ankur Dave <ankurdave@gmail.com>
Closes #1537 from ankurdave/remove-MessageToPartition and squashes the following commits:
f9d0054 [Ankur Dave] Remove MessageToPartition
ab71364 [Ankur Dave] Remove unused VertexBroadcastMsg
Diffstat (limited to 'graphx/src/test')
-rw-r--r-- | graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala | 73 |
1 files changed, 0 insertions, 73 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala index 91caa6b605..864cb1fdf0 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala @@ -26,75 +26,11 @@ import org.scalatest.FunSuite import org.apache.spark._ import org.apache.spark.graphx.impl._ -import org.apache.spark.graphx.impl.MsgRDDFunctions._ import org.apache.spark.serializer.SerializationStream class SerializerSuite extends FunSuite with LocalSparkContext { - test("IntVertexBroadcastMsgSerializer") { - val outMsg = new VertexBroadcastMsg[Int](3, 4, 5) - val bout = new ByteArrayOutputStream - val outStrm = new IntVertexBroadcastMsgSerializer().newInstance().serializeStream(bout) - outStrm.writeObject(outMsg) - outStrm.writeObject(outMsg) - bout.flush() - val bin = new ByteArrayInputStream(bout.toByteArray) - val inStrm = new IntVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin) - val inMsg1: VertexBroadcastMsg[Int] = inStrm.readObject() - val inMsg2: VertexBroadcastMsg[Int] = inStrm.readObject() - assert(outMsg.vid === inMsg1.vid) - assert(outMsg.vid === inMsg2.vid) - assert(outMsg.data === inMsg1.data) - assert(outMsg.data === inMsg2.data) - - intercept[EOFException] { - inStrm.readObject() - } - } - - test("LongVertexBroadcastMsgSerializer") { - val outMsg = new VertexBroadcastMsg[Long](3, 4, 5) - val bout = new ByteArrayOutputStream - val outStrm = new LongVertexBroadcastMsgSerializer().newInstance().serializeStream(bout) - outStrm.writeObject(outMsg) - outStrm.writeObject(outMsg) - bout.flush() - val bin = new ByteArrayInputStream(bout.toByteArray) - val inStrm = new LongVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin) - val inMsg1: VertexBroadcastMsg[Long] = inStrm.readObject() - val inMsg2: VertexBroadcastMsg[Long] = inStrm.readObject() - assert(outMsg.vid === inMsg1.vid) - assert(outMsg.vid === inMsg2.vid) - assert(outMsg.data === inMsg1.data) - assert(outMsg.data === inMsg2.data) - - intercept[EOFException] { - inStrm.readObject() - } - } - - test("DoubleVertexBroadcastMsgSerializer") { - val outMsg = new VertexBroadcastMsg[Double](3, 4, 5.0) - val bout = new ByteArrayOutputStream - val outStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().serializeStream(bout) - outStrm.writeObject(outMsg) - outStrm.writeObject(outMsg) - bout.flush() - val bin = new ByteArrayInputStream(bout.toByteArray) - val inStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin) - val inMsg1: VertexBroadcastMsg[Double] = inStrm.readObject() - val inMsg2: VertexBroadcastMsg[Double] = inStrm.readObject() - assert(outMsg.vid === inMsg1.vid) - assert(outMsg.vid === inMsg2.vid) - assert(outMsg.data === inMsg1.data) - assert(outMsg.data === inMsg2.data) - - intercept[EOFException] { - inStrm.readObject() - } - } - test("IntAggMsgSerializer") { val outMsg = (4: VertexId, 5) val bout = new ByteArrayOutputStream @@ -152,15 +88,6 @@ class SerializerSuite extends FunSuite with LocalSparkContext { } } - test("TestShuffleVertexBroadcastMsg") { - withSpark { sc => - val bmsgs = sc.parallelize(0 until 100, 10).map { pid => - new VertexBroadcastMsg[Int](pid, pid, pid) - } - bmsgs.partitionBy(new HashPartitioner(3)).collect() - } - } - test("variable long encoding") { def testVarLongEncoding(v: Long, optimizePositive: Boolean) { val bout = new ByteArrayOutputStream |