diff options
author | Ankur Dave <ankurdave@gmail.com> | 2014-05-11 19:20:42 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-05-11 19:20:42 -0700 |
commit | a6b02fb7486356493474c7f42bb714c9cce215ca (patch) | |
tree | 820865b6cb770d4f67c57e47f238769201b18370 /graphx/src/test/scala/org | |
parent | f938a155b2a9c126b292d5403aca31de83d5105a (diff) | |
download | spark-a6b02fb7486356493474c7f42bb714c9cce215ca.tar.gz spark-a6b02fb7486356493474c7f42bb714c9cce215ca.tar.bz2 spark-a6b02fb7486356493474c7f42bb714c9cce215ca.zip |
SPARK-1786: Edge Partition Serialization
This appears to address the issue with edge partition serialization. The solution appears to be just registering the `PrimitiveKeyOpenHashMap`. However I noticed that we appear to have forked that code in GraphX but retained the same name (which is confusing). I also renamed our local copy to `GraphXPrimitiveKeyOpenHashMap`. We should consider dropping that and using the one in Spark if possible.
Author: Ankur Dave <ankurdave@gmail.com>
Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com>
Closes #724 from jegonzal/edge_partition_serialization and squashes the following commits:
b0a525a [Ankur Dave] Disable reference tracking to fix serialization test
bb7f548 [Ankur Dave] Add failing test for EdgePartition Kryo serialization
67dac22 [Joseph E. Gonzalez] Making EdgePartition serializable.
Diffstat (limited to 'graphx/src/test/scala/org')
-rw-r--r-- | graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala | 18 |
1 files changed, 18 insertions, 0 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala index d2e0c01bc3..28fd112f2b 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala @@ -22,6 +22,9 @@ import scala.util.Random import org.scalatest.FunSuite +import org.apache.spark.SparkConf +import org.apache.spark.serializer.KryoSerializer + import org.apache.spark.graphx._ class EdgePartitionSuite extends FunSuite { @@ -120,4 +123,19 @@ class EdgePartitionSuite extends FunSuite { assert(!ep.isActive(-1)) assert(ep.numActives == Some(2)) } + + test("Kryo serialization") { + val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0)) + val a: EdgePartition[Int, Int] = makeEdgePartition(aList) + val conf = new SparkConf() + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") + val s = new KryoSerializer(conf).newInstance() + val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a)) + assert(aSer.srcIds.toList === a.srcIds.toList) + assert(aSer.dstIds.toList === a.dstIds.toList) + assert(aSer.data.toList === a.data.toList) + assert(aSer.index != null) + assert(aSer.vertices.iterator.toSet === a.vertices.iterator.toSet) + } } |