aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-03 13:25:03 -0700
committerAnkur Dave <ankurdave@gmail.com>2014-01-08 21:19:14 -0800
commitab861d8450140cdb0a3d9f9b830ec076d8af746d (patch)
tree1365d298cb27ec0010ca4aec7f57dd0d26a529a8 /graph/src
parent0ad75cdfb0093a0b525c598c5af4b9745581a6d7 (diff)
downloadspark-ab861d8450140cdb0a3d9f9b830ec076d8af746d.tar.gz
spark-ab861d8450140cdb0a3d9f9b830ec076d8af746d.tar.bz2
spark-ab861d8450140cdb0a3d9f9b830ec076d8af746d.zip
Take SparkConf in constructor of Serializer subclasses
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala15
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala30
2 files changed, 26 insertions, 19 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala
index e4fa4a4421..dcf619fa85 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala
@@ -3,10 +3,11 @@ package org.apache.spark.graph.impl
import java.io.{EOFException, InputStream, OutputStream}
import java.nio.ByteBuffer
+import org.apache.spark.SparkConf
import org.apache.spark.graph._
import org.apache.spark.serializer._
-class VidMsgSerializer extends Serializer {
+class VidMsgSerializer(conf: SparkConf) extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
@@ -26,7 +27,7 @@ class VidMsgSerializer extends Serializer {
}
/** A special shuffle serializer for VertexBroadcastMessage[Int]. */
-class IntVertexBroadcastMsgSerializer extends Serializer {
+class IntVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
@@ -49,7 +50,7 @@ class IntVertexBroadcastMsgSerializer extends Serializer {
}
/** A special shuffle serializer for VertexBroadcastMessage[Long]. */
-class LongVertexBroadcastMsgSerializer extends Serializer {
+class LongVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
@@ -72,7 +73,7 @@ class LongVertexBroadcastMsgSerializer extends Serializer {
}
/** A special shuffle serializer for VertexBroadcastMessage[Double]. */
-class DoubleVertexBroadcastMsgSerializer extends Serializer {
+class DoubleVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
@@ -95,7 +96,7 @@ class DoubleVertexBroadcastMsgSerializer extends Serializer {
}
/** A special shuffle serializer for AggregationMessage[Int]. */
-class IntAggMsgSerializer extends Serializer {
+class IntAggMsgSerializer(conf: SparkConf) extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
@@ -118,7 +119,7 @@ class IntAggMsgSerializer extends Serializer {
}
/** A special shuffle serializer for AggregationMessage[Long]. */
-class LongAggMsgSerializer extends Serializer {
+class LongAggMsgSerializer(conf: SparkConf) extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
@@ -141,7 +142,7 @@ class LongAggMsgSerializer extends Serializer {
}
/** A special shuffle serializer for AggregationMessage[Double]. */
-class DoubleAggMsgSerializer extends Serializer {
+class DoubleAggMsgSerializer(conf: SparkConf) extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
diff --git a/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala b/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala
index 80075f3437..4014cbe440 100644
--- a/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala
@@ -15,14 +15,15 @@ import org.apache.spark.serializer.SerializationStream
class SerializerSuite extends FunSuite with LocalSparkContext {
test("IntVertexBroadcastMsgSerializer") {
+ val conf = new SparkConf(false)
val outMsg = new VertexBroadcastMsg[Int](3, 4, 5)
val bout = new ByteArrayOutputStream
- val outStrm = new IntVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
+ val outStrm = new IntVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new IntVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
+ val inStrm = new IntVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin)
val inMsg1: VertexBroadcastMsg[Int] = inStrm.readObject()
val inMsg2: VertexBroadcastMsg[Int] = inStrm.readObject()
assert(outMsg.vid === inMsg1.vid)
@@ -36,14 +37,15 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("LongVertexBroadcastMsgSerializer") {
+ val conf = new SparkConf(false)
val outMsg = new VertexBroadcastMsg[Long](3, 4, 5)
val bout = new ByteArrayOutputStream
- val outStrm = new LongVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
+ val outStrm = new LongVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new LongVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
+ val inStrm = new LongVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin)
val inMsg1: VertexBroadcastMsg[Long] = inStrm.readObject()
val inMsg2: VertexBroadcastMsg[Long] = inStrm.readObject()
assert(outMsg.vid === inMsg1.vid)
@@ -57,14 +59,15 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("DoubleVertexBroadcastMsgSerializer") {
+ val conf = new SparkConf(false)
val outMsg = new VertexBroadcastMsg[Double](3, 4, 5.0)
val bout = new ByteArrayOutputStream
- val outStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
+ val outStrm = new DoubleVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
+ val inStrm = new DoubleVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin)
val inMsg1: VertexBroadcastMsg[Double] = inStrm.readObject()
val inMsg2: VertexBroadcastMsg[Double] = inStrm.readObject()
assert(outMsg.vid === inMsg1.vid)
@@ -78,14 +81,15 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("IntAggMsgSerializer") {
+ val conf = new SparkConf(false)
val outMsg = (4: Vid, 5)
val bout = new ByteArrayOutputStream
- val outStrm = new IntAggMsgSerializer().newInstance().serializeStream(bout)
+ val outStrm = new IntAggMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new IntAggMsgSerializer().newInstance().deserializeStream(bin)
+ val inStrm = new IntAggMsgSerializer(conf).newInstance().deserializeStream(bin)
val inMsg1: (Vid, Int) = inStrm.readObject()
val inMsg2: (Vid, Int) = inStrm.readObject()
assert(outMsg === inMsg1)
@@ -97,14 +101,15 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("LongAggMsgSerializer") {
+ val conf = new SparkConf(false)
val outMsg = (4: Vid, 1L << 32)
val bout = new ByteArrayOutputStream
- val outStrm = new LongAggMsgSerializer().newInstance().serializeStream(bout)
+ val outStrm = new LongAggMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new LongAggMsgSerializer().newInstance().deserializeStream(bin)
+ val inStrm = new LongAggMsgSerializer(conf).newInstance().deserializeStream(bin)
val inMsg1: (Vid, Long) = inStrm.readObject()
val inMsg2: (Vid, Long) = inStrm.readObject()
assert(outMsg === inMsg1)
@@ -116,14 +121,15 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("DoubleAggMsgSerializer") {
+ val conf = new SparkConf(false)
val outMsg = (4: Vid, 5.0)
val bout = new ByteArrayOutputStream
- val outStrm = new DoubleAggMsgSerializer().newInstance().serializeStream(bout)
+ val outStrm = new DoubleAggMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
- val inStrm = new DoubleAggMsgSerializer().newInstance().deserializeStream(bin)
+ val inStrm = new DoubleAggMsgSerializer(conf).newInstance().deserializeStream(bin)
val inMsg1: (Vid, Double) = inStrm.readObject()
val inMsg2: (Vid, Double) = inStrm.readObject()
assert(outMsg === inMsg1)