diff options
author | Ankur Dave <ankurdave@gmail.com> | 2014-01-03 13:25:03 -0700 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2014-01-08 21:19:14 -0800 |
commit | ab861d8450140cdb0a3d9f9b830ec076d8af746d (patch) | |
tree | 1365d298cb27ec0010ca4aec7f57dd0d26a529a8 /graph/src | |
parent | 0ad75cdfb0093a0b525c598c5af4b9745581a6d7 (diff) | |
download | spark-ab861d8450140cdb0a3d9f9b830ec076d8af746d.tar.gz spark-ab861d8450140cdb0a3d9f9b830ec076d8af746d.tar.bz2 spark-ab861d8450140cdb0a3d9f9b830ec076d8af746d.zip |
Take SparkConf in constructor of Serializer subclasses
Diffstat (limited to 'graph/src')
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala | 15 | ||||
-rw-r--r-- | graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala | 30 |
2 files changed, 26 insertions, 19 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala index e4fa4a4421..dcf619fa85 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala @@ -3,10 +3,11 @@ package org.apache.spark.graph.impl import java.io.{EOFException, InputStream, OutputStream} import java.nio.ByteBuffer +import org.apache.spark.SparkConf import org.apache.spark.graph._ import org.apache.spark.serializer._ -class VidMsgSerializer extends Serializer { +class VidMsgSerializer(conf: SparkConf) extends Serializer { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { @@ -26,7 +27,7 @@ class VidMsgSerializer extends Serializer { } /** A special shuffle serializer for VertexBroadcastMessage[Int]. */ -class IntVertexBroadcastMsgSerializer extends Serializer { +class IntVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { @@ -49,7 +50,7 @@ class IntVertexBroadcastMsgSerializer extends Serializer { } /** A special shuffle serializer for VertexBroadcastMessage[Long]. */ -class LongVertexBroadcastMsgSerializer extends Serializer { +class LongVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { @@ -72,7 +73,7 @@ class LongVertexBroadcastMsgSerializer extends Serializer { } /** A special shuffle serializer for VertexBroadcastMessage[Double]. */ -class DoubleVertexBroadcastMsgSerializer extends Serializer { +class DoubleVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { @@ -95,7 +96,7 @@ class DoubleVertexBroadcastMsgSerializer extends Serializer { } /** A special shuffle serializer for AggregationMessage[Int]. */ -class IntAggMsgSerializer extends Serializer { +class IntAggMsgSerializer(conf: SparkConf) extends Serializer { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { @@ -118,7 +119,7 @@ class IntAggMsgSerializer extends Serializer { } /** A special shuffle serializer for AggregationMessage[Long]. */ -class LongAggMsgSerializer extends Serializer { +class LongAggMsgSerializer(conf: SparkConf) extends Serializer { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { @@ -141,7 +142,7 @@ class LongAggMsgSerializer extends Serializer { } /** A special shuffle serializer for AggregationMessage[Double]. */ -class DoubleAggMsgSerializer extends Serializer { +class DoubleAggMsgSerializer(conf: SparkConf) extends Serializer { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { diff --git a/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala b/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala index 80075f3437..4014cbe440 100644 --- a/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala @@ -15,14 +15,15 @@ import org.apache.spark.serializer.SerializationStream class SerializerSuite extends FunSuite with LocalSparkContext { test("IntVertexBroadcastMsgSerializer") { + val conf = new SparkConf(false) val outMsg = new VertexBroadcastMsg[Int](3, 4, 5) val bout = new ByteArrayOutputStream - val outStrm = new IntVertexBroadcastMsgSerializer().newInstance().serializeStream(bout) + val outStrm = new IntVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout) outStrm.writeObject(outMsg) outStrm.writeObject(outMsg) bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) - val inStrm = new IntVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin) + val inStrm = new IntVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin) val inMsg1: VertexBroadcastMsg[Int] = inStrm.readObject() val inMsg2: VertexBroadcastMsg[Int] = inStrm.readObject() assert(outMsg.vid === inMsg1.vid) @@ -36,14 +37,15 @@ class SerializerSuite extends FunSuite with LocalSparkContext { } test("LongVertexBroadcastMsgSerializer") { + val conf = new SparkConf(false) val outMsg = new VertexBroadcastMsg[Long](3, 4, 5) val bout = new ByteArrayOutputStream - val outStrm = new LongVertexBroadcastMsgSerializer().newInstance().serializeStream(bout) + val outStrm = new LongVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout) outStrm.writeObject(outMsg) outStrm.writeObject(outMsg) bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) - val inStrm = new LongVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin) + val inStrm = new LongVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin) val inMsg1: VertexBroadcastMsg[Long] = inStrm.readObject() val inMsg2: VertexBroadcastMsg[Long] = inStrm.readObject() assert(outMsg.vid === inMsg1.vid) @@ -57,14 +59,15 @@ class SerializerSuite extends FunSuite with LocalSparkContext { } test("DoubleVertexBroadcastMsgSerializer") { + val conf = new SparkConf(false) val outMsg = new VertexBroadcastMsg[Double](3, 4, 5.0) val bout = new ByteArrayOutputStream - val outStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().serializeStream(bout) + val outStrm = new DoubleVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout) outStrm.writeObject(outMsg) outStrm.writeObject(outMsg) bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) - val inStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin) + val inStrm = new DoubleVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin) val inMsg1: VertexBroadcastMsg[Double] = inStrm.readObject() val inMsg2: VertexBroadcastMsg[Double] = inStrm.readObject() assert(outMsg.vid === inMsg1.vid) @@ -78,14 +81,15 @@ class SerializerSuite extends FunSuite with LocalSparkContext { } test("IntAggMsgSerializer") { + val conf = new SparkConf(false) val outMsg = (4: Vid, 5) val bout = new ByteArrayOutputStream - val outStrm = new IntAggMsgSerializer().newInstance().serializeStream(bout) + val outStrm = new IntAggMsgSerializer(conf).newInstance().serializeStream(bout) outStrm.writeObject(outMsg) outStrm.writeObject(outMsg) bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) - val inStrm = new IntAggMsgSerializer().newInstance().deserializeStream(bin) + val inStrm = new IntAggMsgSerializer(conf).newInstance().deserializeStream(bin) val inMsg1: (Vid, Int) = inStrm.readObject() val inMsg2: (Vid, Int) = inStrm.readObject() assert(outMsg === inMsg1) @@ -97,14 +101,15 @@ class SerializerSuite extends FunSuite with LocalSparkContext { } test("LongAggMsgSerializer") { + val conf = new SparkConf(false) val outMsg = (4: Vid, 1L << 32) val bout = new ByteArrayOutputStream - val outStrm = new LongAggMsgSerializer().newInstance().serializeStream(bout) + val outStrm = new LongAggMsgSerializer(conf).newInstance().serializeStream(bout) outStrm.writeObject(outMsg) outStrm.writeObject(outMsg) bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) - val inStrm = new LongAggMsgSerializer().newInstance().deserializeStream(bin) + val inStrm = new LongAggMsgSerializer(conf).newInstance().deserializeStream(bin) val inMsg1: (Vid, Long) = inStrm.readObject() val inMsg2: (Vid, Long) = inStrm.readObject() assert(outMsg === inMsg1) @@ -116,14 +121,15 @@ class SerializerSuite extends FunSuite with LocalSparkContext { } test("DoubleAggMsgSerializer") { + val conf = new SparkConf(false) val outMsg = (4: Vid, 5.0) val bout = new ByteArrayOutputStream - val outStrm = new DoubleAggMsgSerializer().newInstance().serializeStream(bout) + val outStrm = new DoubleAggMsgSerializer(conf).newInstance().serializeStream(bout) outStrm.writeObject(outMsg) outStrm.writeObject(outMsg) bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) - val inStrm = new DoubleAggMsgSerializer().newInstance().deserializeStream(bin) + val inStrm = new DoubleAggMsgSerializer(conf).newInstance().deserializeStream(bin) val inMsg1: (Vid, Double) = inStrm.readObject() val inMsg2: (Vid, Double) = inStrm.readObject() assert(outMsg === inMsg1) |