aboutsummaryrefslogtreecommitdiff
path: root/graphx/src/test/scala
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-07-22 22:18:30 -0700
committerReynold Xin <rxin@apache.org>2014-07-22 22:18:30 -0700
commit6c2be93f081f33e9e97e1231b0084a6a0eb4fa22 (patch)
tree351799634cd34b39d2d0263395fbfb00d6434d56 /graphx/src/test/scala
parent02e45729472a22a31629cdd17dc3836ba6810189 (diff)
downloadspark-6c2be93f081f33e9e97e1231b0084a6a0eb4fa22.tar.gz
spark-6c2be93f081f33e9e97e1231b0084a6a0eb4fa22.tar.bz2
spark-6c2be93f081f33e9e97e1231b0084a6a0eb4fa22.zip
Remove GraphX MessageToPartition for compatibility with sort-based shuffle
MessageToPartition was used in `Graph#partitionBy`. Unlike a Tuple2, it marked the key as transient to avoid sending it over the network. However, it was incompatible with sort-based shuffle (SPARK-2045) and represented only a minor optimization: for partitionBy, it improved performance by 6.3% (30.4 s to 28.5 s) and reduced communication by 5.6% (114.2 MB to 107.8 MB). Author: Ankur Dave <ankurdave@gmail.com> Closes #1537 from ankurdave/remove-MessageToPartition and squashes the following commits: f9d0054 [Ankur Dave] Remove MessageToPartition ab71364 [Ankur Dave] Remove unused VertexBroadcastMsg
Diffstat (limited to 'graphx/src/test/scala')
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala73
1 files changed, 0 insertions, 73 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
index 91caa6b605..864cb1fdf0 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
@@ -26,75 +26,11 @@ import org.scalatest.FunSuite
import org.apache.spark._
import org.apache.spark.graphx.impl._
-import org.apache.spark.graphx.impl.MsgRDDFunctions._
import org.apache.spark.serializer.SerializationStream
class SerializerSuite extends FunSuite with LocalSparkContext {
- test("IntVertexBroadcastMsgSerializer") {
- val outMsg = new VertexBroadcastMsg[Int](3, 4, 5)
- val bout = new ByteArrayOutputStream
- val outStrm = new IntVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
- outStrm.writeObject(outMsg)
- outStrm.writeObject(outMsg)
- bout.flush()
- val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new IntVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
- val inMsg1: VertexBroadcastMsg[Int] = inStrm.readObject()
- val inMsg2: VertexBroadcastMsg[Int] = inStrm.readObject()
- assert(outMsg.vid === inMsg1.vid)
- assert(outMsg.vid === inMsg2.vid)
- assert(outMsg.data === inMsg1.data)
- assert(outMsg.data === inMsg2.data)
-
- intercept[EOFException] {
- inStrm.readObject()
- }
- }
-
- test("LongVertexBroadcastMsgSerializer") {
- val outMsg = new VertexBroadcastMsg[Long](3, 4, 5)
- val bout = new ByteArrayOutputStream
- val outStrm = new LongVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
- outStrm.writeObject(outMsg)
- outStrm.writeObject(outMsg)
- bout.flush()
- val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new LongVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
- val inMsg1: VertexBroadcastMsg[Long] = inStrm.readObject()
- val inMsg2: VertexBroadcastMsg[Long] = inStrm.readObject()
- assert(outMsg.vid === inMsg1.vid)
- assert(outMsg.vid === inMsg2.vid)
- assert(outMsg.data === inMsg1.data)
- assert(outMsg.data === inMsg2.data)
-
- intercept[EOFException] {
- inStrm.readObject()
- }
- }
-
- test("DoubleVertexBroadcastMsgSerializer") {
- val outMsg = new VertexBroadcastMsg[Double](3, 4, 5.0)
- val bout = new ByteArrayOutputStream
- val outStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
- outStrm.writeObject(outMsg)
- outStrm.writeObject(outMsg)
- bout.flush()
- val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
- val inMsg1: VertexBroadcastMsg[Double] = inStrm.readObject()
- val inMsg2: VertexBroadcastMsg[Double] = inStrm.readObject()
- assert(outMsg.vid === inMsg1.vid)
- assert(outMsg.vid === inMsg2.vid)
- assert(outMsg.data === inMsg1.data)
- assert(outMsg.data === inMsg2.data)
-
- intercept[EOFException] {
- inStrm.readObject()
- }
- }
-
test("IntAggMsgSerializer") {
val outMsg = (4: VertexId, 5)
val bout = new ByteArrayOutputStream
@@ -152,15 +88,6 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
}
- test("TestShuffleVertexBroadcastMsg") {
- withSpark { sc =>
- val bmsgs = sc.parallelize(0 until 100, 10).map { pid =>
- new VertexBroadcastMsg[Int](pid, pid, pid)
- }
- bmsgs.partitionBy(new HashPartitioner(3)).collect()
- }
- }
-
test("variable long encoding") {
def testVarLongEncoding(v: Long, optimizePositive: Boolean) {
val bout = new ByteArrayOutputStream