diff options
Diffstat (limited to 'graphx/src')
4 files changed, 26 insertions, 32 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 1d029bf009..5e9be18990 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -391,6 +391,6 @@ object GraphImpl { // TODO: Consider doing map side distinct before shuffle. new ShuffledRDD[VertexId, Int, (VertexId, Int)]( edges.collectVertexIds.map(vid => (vid, 0)), partitioner) - .setSerializer(classOf[VertexIdMsgSerializer].getName) + .setSerializer(new VertexIdMsgSerializer) } } // end of object GraphImpl diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala index e9ee09c361..fe6fe76def 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala @@ -65,11 +65,11 @@ class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T // Set a custom serializer if the data is of int or double type. if (classTag[T] == ClassTag.Int) { - rdd.setSerializer(classOf[IntVertexBroadcastMsgSerializer].getName) + rdd.setSerializer(new IntVertexBroadcastMsgSerializer) } else if (classTag[T] == ClassTag.Long) { - rdd.setSerializer(classOf[LongVertexBroadcastMsgSerializer].getName) + rdd.setSerializer(new LongVertexBroadcastMsgSerializer) } else if (classTag[T] == ClassTag.Double) { - rdd.setSerializer(classOf[DoubleVertexBroadcastMsgSerializer].getName) + rdd.setSerializer(new DoubleVertexBroadcastMsgSerializer) } rdd } @@ -104,11 +104,11 @@ object MsgRDDFunctions { // Set a custom serializer if the data is of int or double type. if (classTag[T] == ClassTag.Int) { - rdd.setSerializer(classOf[IntAggMsgSerializer].getName) + rdd.setSerializer(new IntAggMsgSerializer) } else if (classTag[T] == ClassTag.Long) { - rdd.setSerializer(classOf[LongAggMsgSerializer].getName) + rdd.setSerializer(new LongAggMsgSerializer) } else if (classTag[T] == ClassTag.Double) { - rdd.setSerializer(classOf[DoubleAggMsgSerializer].getName) + rdd.setSerializer(new DoubleAggMsgSerializer) } rdd } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala index c74d487e20..34a145e018 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala @@ -25,7 +25,7 @@ import org.apache.spark.graphx._ import org.apache.spark.serializer._ private[graphx] -class VertexIdMsgSerializer(conf: SparkConf) extends Serializer { +class VertexIdMsgSerializer extends Serializer with Serializable { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { @@ -46,7 +46,7 @@ class VertexIdMsgSerializer(conf: SparkConf) extends Serializer { /** A special shuffle serializer for VertexBroadcastMessage[Int]. */ private[graphx] -class IntVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer { +class IntVertexBroadcastMsgSerializer extends Serializer with Serializable { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { @@ -70,7 +70,7 @@ class IntVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer { /** A special shuffle serializer for VertexBroadcastMessage[Long]. */ private[graphx] -class LongVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer { +class LongVertexBroadcastMsgSerializer extends Serializer with Serializable { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { @@ -94,7 +94,7 @@ class LongVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer { /** A special shuffle serializer for VertexBroadcastMessage[Double]. */ private[graphx] -class DoubleVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer { +class DoubleVertexBroadcastMsgSerializer extends Serializer with Serializable { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { @@ -118,7 +118,7 @@ class DoubleVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer { /** A special shuffle serializer for AggregationMessage[Int]. */ private[graphx] -class IntAggMsgSerializer(conf: SparkConf) extends Serializer { +class IntAggMsgSerializer extends Serializer with Serializable { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { @@ -142,7 +142,7 @@ class IntAggMsgSerializer(conf: SparkConf) extends Serializer { /** A special shuffle serializer for AggregationMessage[Long]. */ private[graphx] -class LongAggMsgSerializer(conf: SparkConf) extends Serializer { +class LongAggMsgSerializer extends Serializer with Serializable { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { @@ -166,7 +166,7 @@ class LongAggMsgSerializer(conf: SparkConf) extends Serializer { /** A special shuffle serializer for AggregationMessage[Double]. */ private[graphx] -class DoubleAggMsgSerializer(conf: SparkConf) extends Serializer { +class DoubleAggMsgSerializer extends Serializer with Serializable { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { diff --git a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala index e5a582b47b..73438d9535 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala @@ -32,15 +32,14 @@ import org.apache.spark.serializer.SerializationStream class SerializerSuite extends FunSuite with LocalSparkContext { test("IntVertexBroadcastMsgSerializer") { - val conf = new SparkConf(false) val outMsg = new VertexBroadcastMsg[Int](3, 4, 5) val bout = new ByteArrayOutputStream - val outStrm = new IntVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout) + val outStrm = new IntVertexBroadcastMsgSerializer().newInstance().serializeStream(bout) outStrm.writeObject(outMsg) outStrm.writeObject(outMsg) bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) - val inStrm = new IntVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin) + val inStrm = new IntVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin) val inMsg1: VertexBroadcastMsg[Int] = inStrm.readObject() val inMsg2: VertexBroadcastMsg[Int] = inStrm.readObject() assert(outMsg.vid === inMsg1.vid) @@ -54,15 +53,14 @@ class SerializerSuite extends FunSuite with LocalSparkContext { } test("LongVertexBroadcastMsgSerializer") { - val conf = new SparkConf(false) val outMsg = new VertexBroadcastMsg[Long](3, 4, 5) val bout = new ByteArrayOutputStream - val outStrm = new LongVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout) + val outStrm = new LongVertexBroadcastMsgSerializer().newInstance().serializeStream(bout) outStrm.writeObject(outMsg) outStrm.writeObject(outMsg) bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) - val inStrm = new LongVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin) + val inStrm = new LongVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin) val inMsg1: VertexBroadcastMsg[Long] = inStrm.readObject() val inMsg2: VertexBroadcastMsg[Long] = inStrm.readObject() assert(outMsg.vid === inMsg1.vid) @@ -76,15 +74,14 @@ class SerializerSuite extends FunSuite with LocalSparkContext { } test("DoubleVertexBroadcastMsgSerializer") { - val conf = new SparkConf(false) val outMsg = new VertexBroadcastMsg[Double](3, 4, 5.0) val bout = new ByteArrayOutputStream - val outStrm = new DoubleVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout) + val outStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().serializeStream(bout) outStrm.writeObject(outMsg) outStrm.writeObject(outMsg) bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) - val inStrm = new DoubleVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin) + val inStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin) val inMsg1: VertexBroadcastMsg[Double] = inStrm.readObject() val inMsg2: VertexBroadcastMsg[Double] = inStrm.readObject() assert(outMsg.vid === inMsg1.vid) @@ -98,15 +95,14 @@ class SerializerSuite extends FunSuite with LocalSparkContext { } test("IntAggMsgSerializer") { - val conf = new SparkConf(false) val outMsg = (4: VertexId, 5) val bout = new ByteArrayOutputStream - val outStrm = new IntAggMsgSerializer(conf).newInstance().serializeStream(bout) + val outStrm = new IntAggMsgSerializer().newInstance().serializeStream(bout) outStrm.writeObject(outMsg) outStrm.writeObject(outMsg) bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) - val inStrm = new IntAggMsgSerializer(conf).newInstance().deserializeStream(bin) + val inStrm = new IntAggMsgSerializer().newInstance().deserializeStream(bin) val inMsg1: (VertexId, Int) = inStrm.readObject() val inMsg2: (VertexId, Int) = inStrm.readObject() assert(outMsg === inMsg1) @@ -118,15 +114,14 @@ class SerializerSuite extends FunSuite with LocalSparkContext { } test("LongAggMsgSerializer") { - val conf = new SparkConf(false) val outMsg = (4: VertexId, 1L << 32) val bout = new ByteArrayOutputStream - val outStrm = new LongAggMsgSerializer(conf).newInstance().serializeStream(bout) + val outStrm = new LongAggMsgSerializer().newInstance().serializeStream(bout) outStrm.writeObject(outMsg) outStrm.writeObject(outMsg) bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) - val inStrm = new LongAggMsgSerializer(conf).newInstance().deserializeStream(bin) + val inStrm = new LongAggMsgSerializer().newInstance().deserializeStream(bin) val inMsg1: (VertexId, Long) = inStrm.readObject() val inMsg2: (VertexId, Long) = inStrm.readObject() assert(outMsg === inMsg1) @@ -138,15 +133,14 @@ class SerializerSuite extends FunSuite with LocalSparkContext { } test("DoubleAggMsgSerializer") { - val conf = new SparkConf(false) val outMsg = (4: VertexId, 5.0) val bout = new ByteArrayOutputStream - val outStrm = new DoubleAggMsgSerializer(conf).newInstance().serializeStream(bout) + val outStrm = new DoubleAggMsgSerializer().newInstance().serializeStream(bout) outStrm.writeObject(outMsg) outStrm.writeObject(outMsg) bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) - val inStrm = new DoubleAggMsgSerializer(conf).newInstance().deserializeStream(bin) + val inStrm = new DoubleAggMsgSerializer().newInstance().deserializeStream(bin) val inMsg1: (VertexId, Double) = inStrm.readObject() val inMsg2: (VertexId, Double) = inStrm.readObject() assert(outMsg === inMsg1) |