aboutsummaryrefslogtreecommitdiff
path: root/graphx/src/test
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-07-12 12:05:34 -0700
committerReynold Xin <rxin@apache.org>2014-07-12 12:05:34 -0700
commit7a0135293192aaefc6ae20b57e15a90945bd8a4e (patch)
tree572e99e285bbfed4a97d53ef47dfac62b818e089 /graphx/src/test
parent2245c87af4f507cda361e16f322a14eac25b38fd (diff)
downloadspark-7a0135293192aaefc6ae20b57e15a90945bd8a4e.tar.gz
spark-7a0135293192aaefc6ae20b57e15a90945bd8a4e.tar.bz2
spark-7a0135293192aaefc6ae20b57e15a90945bd8a4e.zip
[SPARK-2455] Mark (Shippable)VertexPartition serializable
VertexPartition and ShippableVertexPartition are contained in RDDs but are not marked Serializable, leading to NotSerializableExceptions when using Java serialization. The fix is simply to mark them as Serializable. This PR does that and adds a test for serializing them using Java and Kryo serialization. Author: Ankur Dave <ankurdave@gmail.com> Closes #1376 from ankurdave/SPARK-2455 and squashes the following commits: ed4a51b [Ankur Dave] Make (Shippable)VertexPartition serializable 1fd42c5 [Ankur Dave] Add failing tests for Java serialization
Diffstat (limited to 'graphx/src/test')
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala24
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala20
2 files changed, 33 insertions, 11 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
index 28fd112f2b..9d00f76327 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
@@ -23,6 +23,7 @@ import scala.util.Random
import org.scalatest.FunSuite
import org.apache.spark.SparkConf
+import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.graphx._
@@ -124,18 +125,21 @@ class EdgePartitionSuite extends FunSuite {
assert(ep.numActives == Some(2))
}
- test("Kryo serialization") {
+ test("serialization") {
val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
val a: EdgePartition[Int, Int] = makeEdgePartition(aList)
- val conf = new SparkConf()
+ val javaSer = new JavaSerializer(new SparkConf())
+ val kryoSer = new KryoSerializer(new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
- val s = new KryoSerializer(conf).newInstance()
- val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a))
- assert(aSer.srcIds.toList === a.srcIds.toList)
- assert(aSer.dstIds.toList === a.dstIds.toList)
- assert(aSer.data.toList === a.data.toList)
- assert(aSer.index != null)
- assert(aSer.vertices.iterator.toSet === a.vertices.iterator.toSet)
+ .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator"))
+
+ for (ser <- List(javaSer, kryoSer); s = ser.newInstance()) {
+ val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a))
+ assert(aSer.srcIds.toList === a.srcIds.toList)
+ assert(aSer.dstIds.toList === a.dstIds.toList)
+ assert(aSer.data.toList === a.data.toList)
+ assert(aSer.index != null)
+ assert(aSer.vertices.iterator.toSet === a.vertices.iterator.toSet)
+ }
}
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
index 8bf1384d51..f9e771a900 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
@@ -17,9 +17,14 @@
package org.apache.spark.graphx.impl
-import org.apache.spark.graphx._
import org.scalatest.FunSuite
+import org.apache.spark.SparkConf
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.serializer.KryoSerializer
+
+import org.apache.spark.graphx._
+
class VertexPartitionSuite extends FunSuite {
test("isDefined, filter") {
@@ -116,4 +121,17 @@ class VertexPartitionSuite extends FunSuite {
assert(vp3.index.getPos(2) === -1)
}
+ test("serialization") {
+ val verts = Set((0L, 1), (1L, 1), (2L, 1))
+ val vp = VertexPartition(verts.iterator)
+ val javaSer = new JavaSerializer(new SparkConf())
+ val kryoSer = new KryoSerializer(new SparkConf()
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator"))
+
+ for (ser <- List(javaSer, kryoSer); s = ser.newInstance()) {
+ val vpSer: VertexPartition[Int] = s.deserialize(s.serialize(vp))
+ assert(vpSer.iterator.toSet === verts)
+ }
+ }
}