aboutsummaryrefslogtreecommitdiff
path: root/graphx/src/test
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-03-16 09:57:21 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-16 09:57:21 -0700
commitf5486e9f75d62919583da5ecf9a9ad00222b2227 (patch)
tree42bde2b308647eeaef2c7a92aad176916d884310 /graphx/src/test
parent97e4459e1e4cca8696535e10a91733c15f960107 (diff)
downloadspark-f5486e9f75d62919583da5ecf9a9ad00222b2227.tar.gz
spark-f5486e9f75d62919583da5ecf9a9ad00222b2227.tar.bz2
spark-f5486e9f75d62919583da5ecf9a9ad00222b2227.zip
SPARK-1255: Allow user to pass Serializer object instead of class name for shuffle.
This is more general than simply passing a string name and leaves more room for performance optimizations. Note that this is technically an API breaking change in the following two ways: 1. The shuffle serializer specification in ShuffleDependency now require an object instead of a String (of the class name), but I suspect nobody else in this world has used this API other than me in GraphX and Shark. 2. Serializer's in Spark from now on are required to be serializable. Author: Reynold Xin <rxin@apache.org> Closes #149 from rxin/serializer and squashes the following commits: 5acaccd [Reynold Xin] Properly call serializer's constructors. 2a8d75a [Reynold Xin] Added more documentation for the serializer option in ShuffleDependency. 7420185 [Reynold Xin] Allow user to pass Serializer object instead of class name for shuffle.
Diffstat (limited to 'graphx/src/test')
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala30
1 files changed, 12 insertions, 18 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
index e5a582b47b..73438d9535 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
@@ -32,15 +32,14 @@ import org.apache.spark.serializer.SerializationStream
class SerializerSuite extends FunSuite with LocalSparkContext {
test("IntVertexBroadcastMsgSerializer") {
- val conf = new SparkConf(false)
val outMsg = new VertexBroadcastMsg[Int](3, 4, 5)
val bout = new ByteArrayOutputStream
- val outStrm = new IntVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout)
+ val outStrm = new IntVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new IntVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inStrm = new IntVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: VertexBroadcastMsg[Int] = inStrm.readObject()
val inMsg2: VertexBroadcastMsg[Int] = inStrm.readObject()
assert(outMsg.vid === inMsg1.vid)
@@ -54,15 +53,14 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("LongVertexBroadcastMsgSerializer") {
- val conf = new SparkConf(false)
val outMsg = new VertexBroadcastMsg[Long](3, 4, 5)
val bout = new ByteArrayOutputStream
- val outStrm = new LongVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout)
+ val outStrm = new LongVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new LongVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inStrm = new LongVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: VertexBroadcastMsg[Long] = inStrm.readObject()
val inMsg2: VertexBroadcastMsg[Long] = inStrm.readObject()
assert(outMsg.vid === inMsg1.vid)
@@ -76,15 +74,14 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("DoubleVertexBroadcastMsgSerializer") {
- val conf = new SparkConf(false)
val outMsg = new VertexBroadcastMsg[Double](3, 4, 5.0)
val bout = new ByteArrayOutputStream
- val outStrm = new DoubleVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout)
+ val outStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new DoubleVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: VertexBroadcastMsg[Double] = inStrm.readObject()
val inMsg2: VertexBroadcastMsg[Double] = inStrm.readObject()
assert(outMsg.vid === inMsg1.vid)
@@ -98,15 +95,14 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("IntAggMsgSerializer") {
- val conf = new SparkConf(false)
val outMsg = (4: VertexId, 5)
val bout = new ByteArrayOutputStream
- val outStrm = new IntAggMsgSerializer(conf).newInstance().serializeStream(bout)
+ val outStrm = new IntAggMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new IntAggMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inStrm = new IntAggMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: (VertexId, Int) = inStrm.readObject()
val inMsg2: (VertexId, Int) = inStrm.readObject()
assert(outMsg === inMsg1)
@@ -118,15 +114,14 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("LongAggMsgSerializer") {
- val conf = new SparkConf(false)
val outMsg = (4: VertexId, 1L << 32)
val bout = new ByteArrayOutputStream
- val outStrm = new LongAggMsgSerializer(conf).newInstance().serializeStream(bout)
+ val outStrm = new LongAggMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new LongAggMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inStrm = new LongAggMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: (VertexId, Long) = inStrm.readObject()
val inMsg2: (VertexId, Long) = inStrm.readObject()
assert(outMsg === inMsg1)
@@ -138,15 +133,14 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("DoubleAggMsgSerializer") {
- val conf = new SparkConf(false)
val outMsg = (4: VertexId, 5.0)
val bout = new ByteArrayOutputStream
- val outStrm = new DoubleAggMsgSerializer(conf).newInstance().serializeStream(bout)
+ val outStrm = new DoubleAggMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new DoubleAggMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inStrm = new DoubleAggMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: (VertexId, Double) = inStrm.readObject()
val inMsg2: (VertexId, Double) = inStrm.readObject()
assert(outMsg === inMsg1)